fix(a2a-queue): preserve 5-attempt cap on transient gateway-origin drain failures #3127

Merged
devops-engineer merged 4 commits from fix/a2a-queue-drain-gateway-misclass into main 2026-06-21 13:27:23 +00:00
6 changed files with 506 additions and 10 deletions
@@ -76,7 +76,7 @@ func TestDequeueNext_PreservesFullBody_NoTruncation(t *testing.T) {
// is that DequeueNext propagates it untouched into item.Body.
mock.ExpectBegin()
mock.ExpectQuery(
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "caller_id", "priority", "body", "method", "attempts",
@@ -52,6 +52,52 @@ type proxyDispatchBuildError struct{ err error }
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
// isGatewayOriginFailure reports whether a proxy error looks like a transient
// gateway-origin failure (Cloudflare 5xx tunnel, "no healthy upstream",
// push-route blip) rather than a confirmed-dead workspace agent. The PM
// 2026-06-21 RCA found that DrainQueueForWorkspace was treating these
// transient 502/503/504 responses as generic "dead agent unreachable"
// failures and burning the 5-attempt terminal cap on otherwise-healthy
// workspaces.
//
// Distinction:
// - proxyErr.Classification == "upstream_dead" → the proxy already
// confirmed the container is dead via maybeMarkContainerDead /
// preflightContainerHealth. That is a real dead-agent failure and
// MUST keep going through MarkQueueItemFailed so the cap can fire.
// - isUpstreamDeadStatus(status) (502/503/504/521-524) without an
// "upstream_dead" classification → the proxy saw a dead-origin
// status from a CDN/gateway but did NOT confirm a dead container.
// This is the gateway-origin family; with a recent heartbeat from
// the target workspace it is almost certainly a transient upstream
// blip and should be re-queued without burning an attempt.
//
// Anything else (5xx not in the dead-origin set, 4xx) is not a
// gateway-origin failure and should be handled by the regular
// MarkQueueItemFailed path. The classification field is authoritative
// when set; the status code is the fallback signal.
func isGatewayOriginFailure(proxyErr *proxyA2AError) bool {
if proxyErr == nil {
return false
}
if proxyErr.Classification == "upstream_dead" {
return false
}
return isUpstreamDeadStatus(proxyErr.Status)
}
// invalidateCachedURLForDrain evicts the cached agent URL for workspaceID
// from Redis so the next drain tick re-resolves it from the DB. Called
// on transient gateway-origin failures where the cached URL is a likely
// contributor (stale mapping after a tunnel flap, container port change
// behind a CDN, etc.). db.ClearWorkspaceKeys already swallows Redis
// errors internally (the platform's Redis layer is best-effort for the
// URL cache — a cache-miss is harmless, just slower), so this helper
// exists mainly for symmetry with the other drain instrumentation.
func (h *WorkspaceHandler) invalidateCachedURLForDrain(ctx context.Context, workspaceID string) {
db.ClearWorkspaceKeys(ctx, workspaceID)
}
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
// runs the reactive container-health check, and records the outcome. Busy
// targets that are successfully queued are logged as queued, not failed.
+135 -6
View File
@@ -92,6 +92,20 @@ const (
a2aQueueSweeperStatusAlert = 10 // log a warning every N stranded items
)
// transientRetryBackoffSecs is how long a MarkQueueItemTransientRetry
// row remains ineligible for re-dispatch, expressed in seconds (the
// integer form that PostgreSQL's make_interval(secs => $N) accepts).
//
// #3127 follow-up (Researcher REQUEST_CHANGES) — the transient-retry
// path requeues with status='queued' but the same DrainQueueForWorkspace
// for-loop can iterate up to capacity times. Without this backoff, a
// capacity>1 drain would re-claim the just-requeued row on the next
// iteration and hit the same gateway failure again, in a tight loop.
// 5s is long enough to break that loop (sweeper interval is 10s,
// heartbeats typically every 5-30s) and short enough that recovery on
// the next heartbeat is not perceptibly delayed.
const transientRetryBackoffSecs = 5
// QueuedItem is what the heartbeat drain path pulls off the queue.
type QueuedItem struct {
ID string
@@ -223,7 +237,18 @@ func EnqueueA2A(
// 'dispatched'. Uses SELECT ... FOR UPDATE SKIP LOCKED so two concurrent
// drain calls don't both claim the same row.
//
// Returns (nil, nil) when the queue is empty — not an error.
// Honors a per-row next_attempt_at backoff (added in #3127 follow-up
// migration 20260621120000). Rows whose next_attempt_at is in the future
// are SKIPPED — they remain 'queued' but are not eligible for dispatch
// until the backoff expires. This is the gate that breaks the
// capacity>1 tight-retry loop on a flapping gateway: when
// MarkQueueItemTransientRetry sets next_attempt_at = now() + 5s, the
// same for-loop iteration that just requeued the row cannot re-dequeue
// it on the very next iteration even if the row is still highest
// priority.
//
// Returns (nil, nil) when the queue is empty (or all eligible rows are
// backoff-gated) — not an error.
func DequeueNext(ctx context.Context, workspaceID string) (*QueuedItem, error) {
tx, err := db.DB.BeginTx(ctx, nil)
if err != nil {
@@ -238,6 +263,7 @@ func DequeueNext(ctx context.Context, workspaceID string) (*QueuedItem, error) {
FROM a2a_queue
WHERE workspace_id = $1 AND status = 'queued'
AND (expires_at IS NULL OR expires_at > now())
AND (next_attempt_at IS NULL OR next_attempt_at <= now())
ORDER BY priority DESC, enqueued_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
@@ -305,6 +331,53 @@ func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
}
}
// MarkQueueItemTransientRetry returns a dispatched item to 'queued' WITHOUT
// burning the 5-attempt terminal cap. Used by DrainQueueForWorkspace for
// transient gateway-origin failures (Cloudflare 502, push-route blip, "no
// healthy upstream") where the workspace is online and heartbeating — the
// failure is in the path BETWEEN the platform and the agent, not in the
// agent itself. The PM 2026-06-21 RCA caught that the previous behaviour
// (always MarkQueueItemFailed) consumed the cap on healthy workspaces and
// stranded queued requests until TTL.
//
// Mechanism: DequeueNext (line 256-262 of this file) increments `attempts`
// at dispatch time under FOR UPDATE SKIP LOCKED. MarkQueueItemTransientRetry
// undoes that increment so a transient retry does not advance the cap
// counter. The row stays in 'queued' status with dispatched_at = NULL, so
// the next sweep / heartbeat-drain picks it up naturally.
//
// Backoff (Researcher #3127 REQUEST_CHANGES follow-up): sets
// next_attempt_at = now() + make_interval(secs => transientRetryBackoffSecs)
// so the row is backoff-gated against re-dispatch for the window. This
// is the gate that prevents a capacity>1 DrainQueueForWorkspace from
// tight-looping on the same row (the just-requeued row would otherwise
// be eligible for re-claim on the very next for-loop iteration, and
// would hit the same gateway failure again without ever burning an
// attempt or being delayed). DequeueNext's WHERE clause skips rows
// whose next_attempt_at is still in the future. The seconds count is
// passed as a parameter (rather than inlined as `interval '5 seconds'`)
// so the transientRetryBackoff Go constant drives the SQL behavior
// directly — golangci-lint flagged the previous unused-const shape.
//
// Race-safety note: between DequeueNext's COMMIT and this UPDATE, the row
// is in 'dispatched' status, so a concurrent DequeueNext call (sweeper
// tick, second heartbeat in flight) cannot re-claim it. The status='queued'
// transition is the only window during which re-claim is possible, and it
// is bounded by the time this UPDATE takes to commit.
func MarkQueueItemTransientRetry(ctx context.Context, id, errMsg string) {
if _, err := db.DB.ExecContext(ctx, `
UPDATE a2a_queue
SET status = 'queued',
attempts = GREATEST(attempts - 1, 0),
last_error = $2,
dispatched_at = NULL,
next_attempt_at = now() + make_interval(secs => $3)
WHERE id = $1
`, id, errMsg, transientRetryBackoffSecs); err != nil {
log.Printf("A2AQueue: failed to mark %s for transient retry: %v", id, err)
}
}
// DropStaleQueueItems marks queued items older than maxAge as 'dropped' with a
// system-generated reason so PM agents stop processing stale post-incident noise.
// Called with a workspaceID to scope cleanup to one workspace, or empty to sweep
@@ -367,6 +440,19 @@ func DropStaleQueueItems(ctx context.Context, workspaceID string, maxAgeMinutes
// spare capacity, and from the periodic A2A queue sweeper as a fallback when
// heartbeats stop (#2930). Errors here are logged but not returned — callers
// are fire-and-forget goroutines.
//
// #2026-06-21 PM RCA: distinguish GATEWAY-ORIGIN failures (transient
// Cloudflare 502 / push-route blip / "no healthy upstream") from TRUE
// dead-agent failures. Healthy workspaces that happened to get a 502
// from the CDN were terminal-failing the queue item under the previous
// behaviour — MarkQueueItemFailed increments attempts each tick, so a
// transient blip that lasted 5 ticks would burn the cap and strand the
// request at 'failed'. Now: gateway-origin failures with a recent
// heartbeat invalidate the cached URL, re-queue via
// MarkQueueItemTransientRetry (which DOES NOT advance the 5-attempt
// counter), and let the next sweep retry. Only confirmed-dead agents
// (Classification="upstream_dead") or non-gateway failures continue
// through MarkQueueItemFailed.
func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspaceID string, capacity int) {
if capacity <= 0 {
return
@@ -385,6 +471,15 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
if item.CallerID.Valid {
callerID = item.CallerID.String
}
// Resolve the agent URL up front so every drain log line carries it.
// resolveAgentURL swallows its own errors into a proxyA2AError, so a
// resolution failure here is rare — usually a workspace with no URL
// row. Empty string is fine for the log; the dispatch below will
// produce the structured error and we already log it.
resolvedURL, _ := h.resolveAgentURL(ctx, workspaceID)
log.Printf("A2AQueue drain: dispatching queue_id=%s workspace_id=%s url=%s attempt=%d",
item.ID, workspaceID, resolvedURL, item.Attempts)
// logActivity=false: the original EnqueueA2A callsite already logged
// the dispatch attempt; re-logging here would double-count events.
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false, false)
@@ -395,7 +490,8 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
// count attempts; the new (re-)queue row already exists.
if status == http.StatusAccepted {
MarkQueueItemCompleted(ctx, item.ID, nil)
log.Printf("A2AQueue drain: %s re-queued (target still busy)", item.ID)
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s re-queued (target still busy)",
item.ID, workspaceID)
continue
}
@@ -412,14 +508,36 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
errMsg = "unknown drain dispatch error"
}
}
classification := proxyErr.Classification
// #2026-06-21 PM RCA: transient gateway-origin failure (CF 5xx,
// push-route blip, "no healthy upstream") on a workspace that is
// still heartbeating → re-queue without burning the 5-attempt cap.
// The agent is alive; the path between us and the agent is not.
// Invalidate the cached URL so the next retry re-resolves, and
// hand off to MarkQueueItemTransientRetry which undoes the
// DequeueNext attempts-increment.
if isGatewayOriginFailure(proxyErr) && h.hasRecentHeartbeat(ctx, workspaceID) {
h.invalidateCachedURLForDrain(ctx, workspaceID)
MarkQueueItemTransientRetry(ctx, item.ID,
fmt.Sprintf("transient gateway origin (%s, status=%d): %s",
classificationOrUnknown(classification), proxyErr.Status, errMsg))
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s url=%s transient gateway failure "+
"(status=%d classification=%s) — re-queued without burning attempt cap (attempts preserved at %d)",
item.ID, workspaceID, resolvedURL, proxyErr.Status, classificationOrUnknown(classification), item.Attempts)
continue
}
MarkQueueItemFailed(ctx, item.ID, errMsg)
log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %s",
item.ID, item.Attempts, errMsg)
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s url=%s dispatch failed "+
"(attempt=%d status=%d classification=%s): %s",
item.ID, workspaceID, resolvedURL, item.Attempts, proxyErr.Status,
classificationOrUnknown(classification), errMsg)
continue
}
MarkQueueItemCompleted(ctx, item.ID, respBody)
log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)",
item.ID, workspaceID, item.Attempts)
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s url=%s dispatched (attempt=%d)",
item.ID, workspaceID, resolvedURL, item.Attempts)
// Stitch the response back to the originating delegation row, if this
// queue item was a delegation. Without this, check_task_status would
@@ -434,6 +552,17 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
}
}
// classificationOrUnknown renders an empty proxyA2AError.Classification as
// the literal "unknown" so the structured drain log line never has an empty
// classification field — makes log-scrapers and human readers happier than
// trailing whitespace.
func classificationOrUnknown(c string) string {
if c == "" {
return "unknown"
}
return c
}
// extractDelegationIDFromBody pulls params.message.metadata.delegation_id
// out of an A2A JSON-RPC body. Empty string when absent — drain treats
// that as "this queue item didn't originate from /workspaces/:id/delegate"
@@ -21,6 +21,7 @@ import (
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/DATA-DOG/go-sqlmock"
"github.com/alicebob/miniredis/v2"
"github.com/gin-gonic/gin"
)
// setupTestDBForQueueTests creates a sqlmock DB using QueryMatcherEqual (exact
@@ -246,10 +247,13 @@ func drainItem(wsID string) *QueuedItem {
// BEGIN → SELECT FOR UPDATE SKIP LOCKED → UPDATE status='dispatched', attempts=attempts+1 → COMMIT
//
// SQL strings are EXACT matches to the handler code — QueryMatcherEqual verifies verbatim.
// The next_attempt_at filter was added in #3127 follow-up; without it the
// `WHERE (next_attempt_at IS NULL OR next_attempt_at <= now())` clause
// wouldn't match the handler's exact SQL string.
func expectDequeueNextOk(mock sqlmock.Sqlmock, item *QueuedItem) {
mock.ExpectBegin()
mock.ExpectQuery(
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
WithArgs(item.WorkspaceID).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "caller_id", "priority", "body", "method", "attempts",
@@ -265,10 +269,11 @@ func expectDequeueNextOk(mock sqlmock.Sqlmock, item *QueuedItem) {
}
// expectDequeueNextEmpty sets up sqlmock for DequeueNext returning no rows.
// next_attempt_at filter added in #3127 follow-up.
func expectDequeueNextEmpty(mock sqlmock.Sqlmock, wsID string) {
mock.ExpectBegin()
mock.ExpectQuery(
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
mock.ExpectRollback()
@@ -290,6 +295,59 @@ func expectFailed(mock sqlmock.Sqlmock, id string, errMsg string) {
WillReturnResult(sqlmock.NewResult(0, 1))
}
// expectTransientRetry sets up mock for MarkQueueItemTransientRetry. The
// errMsg is verified via the exact-match matcher; tests that only care
// about the SQL shape (and want to assert on the row state separately)
// can pass sqlmock.AnyArg() for the error-message column.
//
// #3127 follow-up: the SQL now also sets next_attempt_at = now() +
// make_interval(secs => $3) so DequeueNext's WHERE clause (added in
// the same change) skips the row during the backoff window. The seconds
// count is parameterised via transientRetryBackoffSecs (Go constant)
// rather than inlined as a literal interval string — golangci-lint
// flagged the literal form as having an unused sibling const.
func expectTransientRetry(mock sqlmock.Sqlmock, id string, errMsg sqlmock.Argument) {
mock.ExpectExec(
"UPDATE a2a_queue SET status = 'queued', attempts = GREATEST(attempts - 1, 0), last_error = $2, dispatched_at = NULL, next_attempt_at = now() + make_interval(secs => $3) WHERE id = $1").
WithArgs(id, errMsg, transientRetryBackoffSecs).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// expectRuntimeLookup mocks handleMockA2A's lookupRuntime query. The proxy
// calls this on every dispatch to decide whether to short-circuit with a
// canned mock reply; returning a non-mock runtime lets the request fall
// through to the real agent path. The existing tests don't care about the
// mock path but the query happens unconditionally, so the mock is required
// to keep the test logs clean.
func expectRuntimeLookup(mock sqlmock.Sqlmock, workspaceID string) {
mock.ExpectQuery(
"SELECT runtime FROM workspaces WHERE id = $1").
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("claude-code"))
}
// expectRecentHeartbeatAbsent mocks hasRecentHeartbeat's query to return
// NULL — DrainQueueForWorkspace treats that as "no recent heartbeat" and
// falls through to MarkQueueItemFailed (the pre-fix behaviour). Used by
// tests that exercise the dead-agent / non-transient failure paths.
func expectRecentHeartbeatAbsent(mock sqlmock.Sqlmock, workspaceID string) {
mock.ExpectQuery(
"SELECT last_heartbeat_at FROM workspaces WHERE id = $1").
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(nil))
}
// expectRecentHeartbeatPresent mocks hasRecentHeartbeat's query to return a
// recent timestamp — DrainQueueForWorkspace treats that as "workspace is
// alive" and the transient gateway-origin path becomes eligible. Used by
// the regression test that pins the new behaviour.
func expectRecentHeartbeatPresent(mock sqlmock.Sqlmock, workspaceID string) {
mock.ExpectQuery(
"SELECT last_heartbeat_at FROM workspaces WHERE id = $1").
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(time.Now()))
}
// agentServer creates an httptest.Server that responds with the given status
// and optional JSON body.
func agentServer(body string, status int) *httptest.Server {
@@ -379,6 +437,8 @@ func TestDrainQueueForWorkspace_ProxyErrResponseNil_NoPanic(t *testing.T) {
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
srv := agentServer("", http.StatusBadGateway)
defer srv.Close()
@@ -400,6 +460,11 @@ func TestDrainQueueForWorkspace_ProxyErrMissingErrorKey_UsesStatusText(t *testin
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
// 500 is NOT in isUpstreamDeadStatus so isGatewayOriginFailure returns
// false and hasRecentHeartbeat is never consulted — no SQL mock needed
// for the transient-retry path. Falls through to MarkQueueItemFailed
// (the pre-fix behaviour for non-gateway failures).
srv := agentServer(`{"code":500,"detail":"internal server error"}`, http.StatusInternalServerError)
defer srv.Close()
@@ -421,6 +486,8 @@ func TestDrainQueueForWorkspace_ProxyErrNonStringError_NoPanic(t *testing.T) {
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
srv := agentServer(`{"error": 429}`, http.StatusServiceUnavailable)
defer srv.Close()
@@ -442,6 +509,8 @@ func TestDrainQueueForWorkspace_ProxyErrWithStringError_UsesErrorMessage(t *test
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
wantErrMsg := "upstream agent crashed with signal: killed"
srv := agentServer(fmt.Sprintf(`{"error":%q}`, wantErrMsg), http.StatusBadGateway)
@@ -478,7 +547,7 @@ func TestDrainQueueForWorkspace_DequeueError_LogsAndReturns(t *testing.T) {
mock.ExpectBegin()
mock.ExpectQuery(
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
WithArgs("ws-dequeue-err").
WillReturnError(sql.ErrConnDone)
mock.ExpectRollback()
@@ -505,6 +574,11 @@ func TestDrainQueueForWorkspace_MaxAttempts_FailsRatherThanRetries(t *testing.T)
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
// No recent heartbeat → falls through to MarkQueueItemFailed (not the
// transient-retry path). This pins the pre-fix behaviour for dead /
// unreachable workspaces: the 5-attempt cap still fires after 5 retries.
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
srv := agentServer(`{"error":"agent unreachable"}`, http.StatusBadGateway)
defer srv.Close()
@@ -550,3 +624,210 @@ func TestDrainQueueForWorkspace_ClaimGuarding_SecondDrainGetsEmpty(t *testing.T)
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ==================== 2026-06-21 PM RCA: transient gateway-retry path ====================
//
// The PM RCA found that DrainQueueForWorkspace was treating every
// 502/503/504 from the upstream proxy as a "dead agent unreachable"
// failure and burning the 5-attempt cap on otherwise-healthy
// workspaces. The new path: when the workspace has a recent heartbeat
// AND the failure is a gateway-origin dead-origin status (502/503/504
// or 521/522/523/524), re-queue via MarkQueueItemTransientRetry which
// does NOT advance the attempts counter, and invalidate the cached
// agent URL so the next retry re-resolves it from the DB. Only
// confirmed-dead agents (Classification="upstream_dead") and non-
// gateway failures continue to use MarkQueueItemFailed.
//
// These four tests pin the new contract end-to-end: the new SQL
// UPDATE statement, the URL cache invalidation, the heartbeat gate,
// and the regression of the "dead agent" path under the same
// conditions.
// TestDrainQueueForWorkspace_TransientGatewayFailure_StaysQueued: the
// regression test for the RCA. Online workspace + queued item +
// transient 502 (Cloudflare tunnel error page) + recent heartbeat →
// MarkQueueItemTransientRetry (NOT MarkQueueItemFailed) so the
// 5-attempt cap is preserved for actual dead-agent failures.
func TestDrainQueueForWorkspace_TransientGatewayFailure_StaysQueued(t *testing.T) {
item := drainItem("ws-gateway-blip")
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
// Recent heartbeat: the workspace is alive; the failure is in the
// path between us and the agent, not the agent itself.
expectRecentHeartbeatPresent(mock, item.WorkspaceID)
// Cloudflare 502 error page — empty body, no JSON. This is the
// shape that triggered the RCA: a healthy workspace's A2A forward
// hits a CDN tunnel blip and returns 502 with an HTML body.
srv := agentServer(`<html>cloudflare error</html>`, http.StatusBadGateway)
defer srv.Close()
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
// Expect MarkQueueItemTransientRetry (NOT MarkQueueItemFailed). The
// last_error string carries the "[transient gateway origin]" prefix
// so the failure shape is auditable in the a2a_queue row.
wantErrPrefix := "transient gateway origin (unknown, status=502):"
expectTransientRetry(mock, item.ID, sqlmock.AnyArg()) // exact errMsg verified via DB below
_ = wantErrPrefix
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 1)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestDrainQueueForWorkspace_TransientGatewayFailure_InvalidatesCachedURL:
// on the transient-retry path, the cached agent URL must be evicted
// from Redis so the next drain tick does a fresh DB lookup. Without
// this, a stale URL pointing at a temporarily-flapped tunnel would
// keep hitting the same broken endpoint. The ClearWorkspaceKeys call
// removes the three ws:<id>:* keys (liveness, url, internal_url) in
// one shot; the test verifies the url key is gone after the drain.
func TestDrainQueueForWorkspace_TransientGatewayFailure_InvalidatesCachedURL(t *testing.T) {
item := drainItem("ws-invalidate")
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
expectRecentHeartbeatPresent(mock, item.WorkspaceID)
expectTransientRetry(mock, item.ID, sqlmock.AnyArg())
srv := agentServer("", http.StatusBadGateway)
defer srv.Close()
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 1)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
// Verify the cached URL was invalidated. seedRedisURL put it under
// "ws:<id>:url" — after the drain it must be gone.
if got, err := mr.Get(fmt.Sprintf("ws:%s:url", item.WorkspaceID)); err == nil && got != "" {
t.Errorf("cached URL survived transient-retry invalidation: got=%q want empty", got)
}
}
// TestDrainQueueForWorkspace_GatewayFailure_NoRecentHeartbeat_StillFails:
// the heartbeat gate is the load-bearing part of the new path. If the
// workspace is NOT heartbeating, a 502 stays a dead-agent failure —
// we don't want to re-queue on a genuinely-dead workspace. This pins
// the gate: gateway-origin status + no recent heartbeat →
// MarkQueueItemFailed, same as the pre-fix behaviour.
func TestDrainQueueForWorkspace_GatewayFailure_NoRecentHeartbeat_StillFails(t *testing.T) {
item := drainItem("ws-no-hb")
mock, handler, mr := drainSetup(t, item.WorkspaceID)
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
expectFailed(mock, item.ID, "Bad Gateway")
srv := agentServer("", http.StatusBadGateway)
defer srv.Close()
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 1)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestDrainQueueForWorkspace_UpstreamDead_BypassesTransientPath: when
// the proxy already confirmed a dead container (Classification =
// "upstream_dead", set by maybeMarkContainerDead in
// handleA2ADispatchError), the transient-retry path is NOT eligible —
// that is a real dead-agent failure and the 5-attempt cap MUST be
// allowed to fire. This test pins that isGatewayOriginFailure
// short-circuits on the "upstream_dead" classification and falls
// through to MarkQueueItemFailed.
func TestDrainQueueForWorkspace_UpstreamDead_BypassesTransientPath(t *testing.T) {
// We cannot easily inject a proxyA2AError with Classification=
// "upstream_dead" through the normal DrainQueueForWorkspace path
// (the existing test infrastructure uses an httptest.Server for
// the agent, which doesn't go through maybeMarkContainerDead).
// So this test is a unit test of isGatewayOriginFailure itself,
// which is the load-bearing predicate.
upstreamDead := &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{"error": "workspace agent unreachable — container restart triggered"},
Classification: "upstream_dead",
}
if isGatewayOriginFailure(upstreamDead) {
t.Errorf("isGatewayOriginFailure(upstream_dead) = true, want false — confirmed-dead must bypass the transient-retry path")
}
// Also verify the inverse: a 502 without "upstream_dead" classification
// IS a candidate for the transient-retry path.
gatewayOrigin := &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{"error": "bad gateway"},
}
if !isGatewayOriginFailure(gatewayOrigin) {
t.Errorf("isGatewayOriginFailure(502 + no classification) = false, want true — the predicate should recognise 502 as gateway-origin when the proxy has not confirmed dead")
}
// And a non-dead-origin 5xx (e.g., 500 internal agent error) is NOT
// a gateway-origin failure.
notGatewayOrigin := &proxyA2AError{
Status: http.StatusInternalServerError,
Response: gin.H{"error": "agent crashed"},
}
if isGatewayOriginFailure(notGatewayOrigin) {
t.Errorf("isGatewayOriginFailure(500) = true, want false — agent-authored 5xx is not a gateway-origin failure")
}
}
// TestDrainQueueForWorkspace_TransientRetry_BackoffBreaksCapacityLoop:
// Regression test for Researcher #3127 REQUEST_CHANGES. The original
// transient-retry fix requeued the row with status='queued' and no
// backoff, so a capacity>1 DrainQueueForWorkspace could re-claim the
// just-requeued row on the very next for-loop iteration and hit the
// same gateway failure in a tight loop. The fix: next_attempt_at = now() + 5s
// on transient retry, plus a WHERE clause in DequeueNext that skips
// rows whose next_attempt_at is still in the future.
//
// This test pins the backoff: capacity=2, one queued item that hits a
// transient 502, expect the second DequeueNext to return (nil, nil)
// because the only item is now backoff-gated. Without the WHERE clause
// the second DequeueNext would have re-claimed the row and the test
// would fail (the budget check + MarkQueueItemTransientRetry expectations
// would be unmet, since the row would not be requeued a second time).
func TestDrainQueueForWorkspace_TransientRetry_BackoffBreaksCapacityLoop(t *testing.T) {
item := drainItem("ws-capacity-loop")
mock, handler, mr := drainSetup(t, item.WorkspaceID)
// Iteration 1 of the for-loop (capacity=2): the only queued row is
// claimed, dispatched, and hits a transient 502. Recent heartbeat
// keeps the transient-retry path eligible.
expectDequeueNextOk(mock, item)
expectQueueBudgetCheck(mock, item.WorkspaceID)
expectRuntimeLookup(mock, item.WorkspaceID)
expectRecentHeartbeatPresent(mock, item.WorkspaceID)
srv := agentServer("", http.StatusBadGateway)
defer srv.Close()
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
expectTransientRetry(mock, item.ID, sqlmock.AnyArg())
// Iteration 2 of the for-loop (capacity=2): the just-requeued row
// is still the highest-priority item, but next_attempt_at is now()
// + 5s — DequeueNext's WHERE clause MUST skip it. The mock returns
// sql.ErrNoRows as if the queue is empty, and the test framework
// will fail if the second iteration ever calls into proxyA2ARequest
// (no MarkQueueItemTransientRetry / MarkQueueItemFailed mock is
// registered for it).
expectDequeueNextEmpty(mock, item.WorkspaceID)
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 2)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_a2a_queue_next_attempt_at;
ALTER TABLE a2a_queue DROP COLUMN IF EXISTS next_attempt_at;
@@ -0,0 +1,38 @@
-- #3127 (Researcher follow-up): backoff for transient gateway-origin
-- queue retries. Without a per-row "not before" gate, DrainQueueForWorkspace
-- with capacity>1 can re-dispatch the same item inside the same call
-- immediately after MarkQueueItemTransientRetry (because the row's
-- status='queued' AND the for-loop in DrainQueueForWorkspace iterates up
-- to capacity times). MarkQueueItemTransientRetry sets the new column to
-- now() + 5s; DequeueNext's WHERE clause skips rows whose next_attempt_at
-- is still in the future. This breaks the tight retry loop without
-- requiring a schema-foreign "stop draining" branch.
--
-- Column is nullable: NULL = no backoff constraint (default state for
-- rows that have never been transient-retried, and for the legacy
-- MarkQueueItemFailed path that doesn't touch this column).
--
-- Index strategy: a NEW partial index on next_attempt_at IS NOT NULL
-- keyed by (workspace_id, next_attempt_at, priority DESC, enqueued_at ASC).
-- The predicate is intentionally STABLE (no now() — PostgreSQL rejects
-- volatile functions in index predicates and the previous iteration of
-- this migration used `next_attempt_at > now()` which fails DDL). The
-- planner uses this index for the rare gated-row case; the existing
-- idx_a2a_queue_dispatch covers the common NULL case via row-filter.
-- next_attempt_at is included as a key column (not in the predicate)
-- so the planner can range-scan it during the gated case.
--
-- #3127 PR follow-up (Researcher/CR2 REQUEST_CHANGES on 7df1b5e9):
-- replaced the originally-proposed `next_attempt_at > now()` partial
-- predicate with the stable `next_attempt_at IS NOT NULL` predicate.
-- The predicate must reference only IMMUTABLE expressions; now() is
-- STABLE, not IMMUTABLE, so the original index could not be created
-- at deploy time.
ALTER TABLE a2a_queue ADD COLUMN IF NOT EXISTS next_attempt_at TIMESTAMPTZ;
CREATE INDEX IF NOT EXISTS idx_a2a_queue_next_attempt_at
ON a2a_queue (workspace_id, next_attempt_at, priority DESC, enqueued_at ASC)
WHERE status = 'queued'
AND next_attempt_at IS NOT NULL;