fix(a2a-queue): preserve 5-attempt cap on transient gateway-origin drain failures #3127
@@ -76,7 +76,7 @@ func TestDequeueNext_PreservesFullBody_NoTruncation(t *testing.T) {
|
||||
// is that DequeueNext propagates it untouched into item.Body.
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectQuery(
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "caller_id", "priority", "body", "method", "attempts",
|
||||
|
||||
@@ -52,6 +52,52 @@ type proxyDispatchBuildError struct{ err error }
|
||||
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// isGatewayOriginFailure reports whether a proxy error looks like a transient
|
||||
// gateway-origin failure (Cloudflare 5xx tunnel, "no healthy upstream",
|
||||
// push-route blip) rather than a confirmed-dead workspace agent. The PM
|
||||
// 2026-06-21 RCA found that DrainQueueForWorkspace was treating these
|
||||
// transient 502/503/504 responses as generic "dead agent unreachable"
|
||||
// failures and burning the 5-attempt terminal cap on otherwise-healthy
|
||||
// workspaces.
|
||||
//
|
||||
// Distinction:
|
||||
// - proxyErr.Classification == "upstream_dead" → the proxy already
|
||||
// confirmed the container is dead via maybeMarkContainerDead /
|
||||
// preflightContainerHealth. That is a real dead-agent failure and
|
||||
// MUST keep going through MarkQueueItemFailed so the cap can fire.
|
||||
// - isUpstreamDeadStatus(status) (502/503/504/521-524) without an
|
||||
// "upstream_dead" classification → the proxy saw a dead-origin
|
||||
// status from a CDN/gateway but did NOT confirm a dead container.
|
||||
// This is the gateway-origin family; with a recent heartbeat from
|
||||
// the target workspace it is almost certainly a transient upstream
|
||||
// blip and should be re-queued without burning an attempt.
|
||||
//
|
||||
// Anything else (5xx not in the dead-origin set, 4xx) is not a
|
||||
// gateway-origin failure and should be handled by the regular
|
||||
// MarkQueueItemFailed path. The classification field is authoritative
|
||||
// when set; the status code is the fallback signal.
|
||||
func isGatewayOriginFailure(proxyErr *proxyA2AError) bool {
|
||||
if proxyErr == nil {
|
||||
return false
|
||||
}
|
||||
if proxyErr.Classification == "upstream_dead" {
|
||||
return false
|
||||
}
|
||||
return isUpstreamDeadStatus(proxyErr.Status)
|
||||
}
|
||||
|
||||
// invalidateCachedURLForDrain evicts the cached agent URL for workspaceID
|
||||
// from Redis so the next drain tick re-resolves it from the DB. Called
|
||||
// on transient gateway-origin failures where the cached URL is a likely
|
||||
// contributor (stale mapping after a tunnel flap, container port change
|
||||
// behind a CDN, etc.). db.ClearWorkspaceKeys already swallows Redis
|
||||
// errors internally (the platform's Redis layer is best-effort for the
|
||||
// URL cache — a cache-miss is harmless, just slower), so this helper
|
||||
// exists mainly for symmetry with the other drain instrumentation.
|
||||
func (h *WorkspaceHandler) invalidateCachedURLForDrain(ctx context.Context, workspaceID string) {
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
}
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and records the outcome. Busy
|
||||
// targets that are successfully queued are logged as queued, not failed.
|
||||
|
||||
@@ -92,6 +92,20 @@ const (
|
||||
a2aQueueSweeperStatusAlert = 10 // log a warning every N stranded items
|
||||
)
|
||||
|
||||
// transientRetryBackoffSecs is how long a MarkQueueItemTransientRetry
|
||||
// row remains ineligible for re-dispatch, expressed in seconds (the
|
||||
// integer form that PostgreSQL's make_interval(secs => $N) accepts).
|
||||
//
|
||||
// #3127 follow-up (Researcher REQUEST_CHANGES) — the transient-retry
|
||||
// path requeues with status='queued' but the same DrainQueueForWorkspace
|
||||
// for-loop can iterate up to capacity times. Without this backoff, a
|
||||
// capacity>1 drain would re-claim the just-requeued row on the next
|
||||
// iteration and hit the same gateway failure again, in a tight loop.
|
||||
// 5s is long enough to break that loop (sweeper interval is 10s,
|
||||
// heartbeats typically every 5-30s) and short enough that recovery on
|
||||
// the next heartbeat is not perceptibly delayed.
|
||||
const transientRetryBackoffSecs = 5
|
||||
|
||||
// QueuedItem is what the heartbeat drain path pulls off the queue.
|
||||
type QueuedItem struct {
|
||||
ID string
|
||||
@@ -223,7 +237,18 @@ func EnqueueA2A(
|
||||
// 'dispatched'. Uses SELECT ... FOR UPDATE SKIP LOCKED so two concurrent
|
||||
// drain calls don't both claim the same row.
|
||||
//
|
||||
// Returns (nil, nil) when the queue is empty — not an error.
|
||||
// Honors a per-row next_attempt_at backoff (added in #3127 follow-up
|
||||
// migration 20260621120000). Rows whose next_attempt_at is in the future
|
||||
// are SKIPPED — they remain 'queued' but are not eligible for dispatch
|
||||
// until the backoff expires. This is the gate that breaks the
|
||||
// capacity>1 tight-retry loop on a flapping gateway: when
|
||||
// MarkQueueItemTransientRetry sets next_attempt_at = now() + 5s, the
|
||||
// same for-loop iteration that just requeued the row cannot re-dequeue
|
||||
// it on the very next iteration even if the row is still highest
|
||||
// priority.
|
||||
//
|
||||
// Returns (nil, nil) when the queue is empty (or all eligible rows are
|
||||
// backoff-gated) — not an error.
|
||||
func DequeueNext(ctx context.Context, workspaceID string) (*QueuedItem, error) {
|
||||
tx, err := db.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
@@ -238,6 +263,7 @@ func DequeueNext(ctx context.Context, workspaceID string) (*QueuedItem, error) {
|
||||
FROM a2a_queue
|
||||
WHERE workspace_id = $1 AND status = 'queued'
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
AND (next_attempt_at IS NULL OR next_attempt_at <= now())
|
||||
ORDER BY priority DESC, enqueued_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
@@ -305,6 +331,53 @@ func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
|
||||
}
|
||||
}
|
||||
|
||||
// MarkQueueItemTransientRetry returns a dispatched item to 'queued' WITHOUT
|
||||
// burning the 5-attempt terminal cap. Used by DrainQueueForWorkspace for
|
||||
// transient gateway-origin failures (Cloudflare 502, push-route blip, "no
|
||||
// healthy upstream") where the workspace is online and heartbeating — the
|
||||
// failure is in the path BETWEEN the platform and the agent, not in the
|
||||
// agent itself. The PM 2026-06-21 RCA caught that the previous behaviour
|
||||
// (always MarkQueueItemFailed) consumed the cap on healthy workspaces and
|
||||
// stranded queued requests until TTL.
|
||||
//
|
||||
// Mechanism: DequeueNext (line 256-262 of this file) increments `attempts`
|
||||
// at dispatch time under FOR UPDATE SKIP LOCKED. MarkQueueItemTransientRetry
|
||||
// undoes that increment so a transient retry does not advance the cap
|
||||
// counter. The row stays in 'queued' status with dispatched_at = NULL, so
|
||||
// the next sweep / heartbeat-drain picks it up naturally.
|
||||
//
|
||||
// Backoff (Researcher #3127 REQUEST_CHANGES follow-up): sets
|
||||
// next_attempt_at = now() + make_interval(secs => transientRetryBackoffSecs)
|
||||
// so the row is backoff-gated against re-dispatch for the window. This
|
||||
// is the gate that prevents a capacity>1 DrainQueueForWorkspace from
|
||||
// tight-looping on the same row (the just-requeued row would otherwise
|
||||
// be eligible for re-claim on the very next for-loop iteration, and
|
||||
// would hit the same gateway failure again without ever burning an
|
||||
// attempt or being delayed). DequeueNext's WHERE clause skips rows
|
||||
// whose next_attempt_at is still in the future. The seconds count is
|
||||
// passed as a parameter (rather than inlined as `interval '5 seconds'`)
|
||||
// so the transientRetryBackoff Go constant drives the SQL behavior
|
||||
// directly — golangci-lint flagged the previous unused-const shape.
|
||||
//
|
||||
// Race-safety note: between DequeueNext's COMMIT and this UPDATE, the row
|
||||
// is in 'dispatched' status, so a concurrent DequeueNext call (sweeper
|
||||
// tick, second heartbeat in flight) cannot re-claim it. The status='queued'
|
||||
// transition is the only window during which re-claim is possible, and it
|
||||
// is bounded by the time this UPDATE takes to commit.
|
||||
func MarkQueueItemTransientRetry(ctx context.Context, id, errMsg string) {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE a2a_queue
|
||||
SET status = 'queued',
|
||||
attempts = GREATEST(attempts - 1, 0),
|
||||
last_error = $2,
|
||||
dispatched_at = NULL,
|
||||
next_attempt_at = now() + make_interval(secs => $3)
|
||||
WHERE id = $1
|
||||
`, id, errMsg, transientRetryBackoffSecs); err != nil {
|
||||
log.Printf("A2AQueue: failed to mark %s for transient retry: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// DropStaleQueueItems marks queued items older than maxAge as 'dropped' with a
|
||||
// system-generated reason so PM agents stop processing stale post-incident noise.
|
||||
// Called with a workspaceID to scope cleanup to one workspace, or empty to sweep
|
||||
@@ -367,6 +440,19 @@ func DropStaleQueueItems(ctx context.Context, workspaceID string, maxAgeMinutes
|
||||
// spare capacity, and from the periodic A2A queue sweeper as a fallback when
|
||||
// heartbeats stop (#2930). Errors here are logged but not returned — callers
|
||||
// are fire-and-forget goroutines.
|
||||
//
|
||||
// #2026-06-21 PM RCA: distinguish GATEWAY-ORIGIN failures (transient
|
||||
// Cloudflare 502 / push-route blip / "no healthy upstream") from TRUE
|
||||
// dead-agent failures. Healthy workspaces that happened to get a 502
|
||||
// from the CDN were terminal-failing the queue item under the previous
|
||||
// behaviour — MarkQueueItemFailed increments attempts each tick, so a
|
||||
// transient blip that lasted 5 ticks would burn the cap and strand the
|
||||
// request at 'failed'. Now: gateway-origin failures with a recent
|
||||
// heartbeat invalidate the cached URL, re-queue via
|
||||
// MarkQueueItemTransientRetry (which DOES NOT advance the 5-attempt
|
||||
// counter), and let the next sweep retry. Only confirmed-dead agents
|
||||
// (Classification="upstream_dead") or non-gateway failures continue
|
||||
// through MarkQueueItemFailed.
|
||||
func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspaceID string, capacity int) {
|
||||
if capacity <= 0 {
|
||||
return
|
||||
@@ -385,6 +471,15 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
if item.CallerID.Valid {
|
||||
callerID = item.CallerID.String
|
||||
}
|
||||
// Resolve the agent URL up front so every drain log line carries it.
|
||||
// resolveAgentURL swallows its own errors into a proxyA2AError, so a
|
||||
// resolution failure here is rare — usually a workspace with no URL
|
||||
// row. Empty string is fine for the log; the dispatch below will
|
||||
// produce the structured error and we already log it.
|
||||
resolvedURL, _ := h.resolveAgentURL(ctx, workspaceID)
|
||||
log.Printf("A2AQueue drain: dispatching queue_id=%s workspace_id=%s url=%s attempt=%d",
|
||||
item.ID, workspaceID, resolvedURL, item.Attempts)
|
||||
|
||||
// logActivity=false: the original EnqueueA2A callsite already logged
|
||||
// the dispatch attempt; re-logging here would double-count events.
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false, false)
|
||||
@@ -395,7 +490,8 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
// count attempts; the new (re-)queue row already exists.
|
||||
if status == http.StatusAccepted {
|
||||
MarkQueueItemCompleted(ctx, item.ID, nil)
|
||||
log.Printf("A2AQueue drain: %s re-queued (target still busy)", item.ID)
|
||||
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s re-queued (target still busy)",
|
||||
item.ID, workspaceID)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -412,14 +508,36 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
errMsg = "unknown drain dispatch error"
|
||||
}
|
||||
}
|
||||
classification := proxyErr.Classification
|
||||
|
||||
// #2026-06-21 PM RCA: transient gateway-origin failure (CF 5xx,
|
||||
// push-route blip, "no healthy upstream") on a workspace that is
|
||||
// still heartbeating → re-queue without burning the 5-attempt cap.
|
||||
// The agent is alive; the path between us and the agent is not.
|
||||
// Invalidate the cached URL so the next retry re-resolves, and
|
||||
// hand off to MarkQueueItemTransientRetry which undoes the
|
||||
// DequeueNext attempts-increment.
|
||||
if isGatewayOriginFailure(proxyErr) && h.hasRecentHeartbeat(ctx, workspaceID) {
|
||||
h.invalidateCachedURLForDrain(ctx, workspaceID)
|
||||
MarkQueueItemTransientRetry(ctx, item.ID,
|
||||
fmt.Sprintf("transient gateway origin (%s, status=%d): %s",
|
||||
classificationOrUnknown(classification), proxyErr.Status, errMsg))
|
||||
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s url=%s transient gateway failure "+
|
||||
"(status=%d classification=%s) — re-queued without burning attempt cap (attempts preserved at %d)",
|
||||
item.ID, workspaceID, resolvedURL, proxyErr.Status, classificationOrUnknown(classification), item.Attempts)
|
||||
continue
|
||||
}
|
||||
|
||||
MarkQueueItemFailed(ctx, item.ID, errMsg)
|
||||
log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %s",
|
||||
item.ID, item.Attempts, errMsg)
|
||||
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s url=%s dispatch failed "+
|
||||
"(attempt=%d status=%d classification=%s): %s",
|
||||
item.ID, workspaceID, resolvedURL, item.Attempts, proxyErr.Status,
|
||||
classificationOrUnknown(classification), errMsg)
|
||||
continue
|
||||
}
|
||||
MarkQueueItemCompleted(ctx, item.ID, respBody)
|
||||
log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)",
|
||||
item.ID, workspaceID, item.Attempts)
|
||||
log.Printf("A2AQueue drain: queue_id=%s workspace_id=%s url=%s dispatched (attempt=%d)",
|
||||
item.ID, workspaceID, resolvedURL, item.Attempts)
|
||||
|
||||
// Stitch the response back to the originating delegation row, if this
|
||||
// queue item was a delegation. Without this, check_task_status would
|
||||
@@ -434,6 +552,17 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
}
|
||||
}
|
||||
|
||||
// classificationOrUnknown renders an empty proxyA2AError.Classification as
|
||||
// the literal "unknown" so the structured drain log line never has an empty
|
||||
// classification field — makes log-scrapers and human readers happier than
|
||||
// trailing whitespace.
|
||||
func classificationOrUnknown(c string) string {
|
||||
if c == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// extractDelegationIDFromBody pulls params.message.metadata.delegation_id
|
||||
// out of an A2A JSON-RPC body. Empty string when absent — drain treats
|
||||
// that as "this queue item didn't originate from /workspaces/:id/delegate"
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// setupTestDBForQueueTests creates a sqlmock DB using QueryMatcherEqual (exact
|
||||
@@ -246,10 +247,13 @@ func drainItem(wsID string) *QueuedItem {
|
||||
// BEGIN → SELECT FOR UPDATE SKIP LOCKED → UPDATE status='dispatched', attempts=attempts+1 → COMMIT
|
||||
//
|
||||
// SQL strings are EXACT matches to the handler code — QueryMatcherEqual verifies verbatim.
|
||||
// The next_attempt_at filter was added in #3127 follow-up; without it the
|
||||
// `WHERE (next_attempt_at IS NULL OR next_attempt_at <= now())` clause
|
||||
// wouldn't match the handler's exact SQL string.
|
||||
func expectDequeueNextOk(mock sqlmock.Sqlmock, item *QueuedItem) {
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectQuery(
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
WithArgs(item.WorkspaceID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "caller_id", "priority", "body", "method", "attempts",
|
||||
@@ -265,10 +269,11 @@ func expectDequeueNextOk(mock sqlmock.Sqlmock, item *QueuedItem) {
|
||||
}
|
||||
|
||||
// expectDequeueNextEmpty sets up sqlmock for DequeueNext returning no rows.
|
||||
// next_attempt_at filter added in #3127 follow-up.
|
||||
func expectDequeueNextEmpty(mock sqlmock.Sqlmock, wsID string) {
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectQuery(
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectRollback()
|
||||
@@ -290,6 +295,59 @@ func expectFailed(mock sqlmock.Sqlmock, id string, errMsg string) {
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
}
|
||||
|
||||
// expectTransientRetry sets up mock for MarkQueueItemTransientRetry. The
|
||||
// errMsg is verified via the exact-match matcher; tests that only care
|
||||
// about the SQL shape (and want to assert on the row state separately)
|
||||
// can pass sqlmock.AnyArg() for the error-message column.
|
||||
//
|
||||
// #3127 follow-up: the SQL now also sets next_attempt_at = now() +
|
||||
// make_interval(secs => $3) so DequeueNext's WHERE clause (added in
|
||||
// the same change) skips the row during the backoff window. The seconds
|
||||
// count is parameterised via transientRetryBackoffSecs (Go constant)
|
||||
// rather than inlined as a literal interval string — golangci-lint
|
||||
// flagged the literal form as having an unused sibling const.
|
||||
func expectTransientRetry(mock sqlmock.Sqlmock, id string, errMsg sqlmock.Argument) {
|
||||
mock.ExpectExec(
|
||||
"UPDATE a2a_queue SET status = 'queued', attempts = GREATEST(attempts - 1, 0), last_error = $2, dispatched_at = NULL, next_attempt_at = now() + make_interval(secs => $3) WHERE id = $1").
|
||||
WithArgs(id, errMsg, transientRetryBackoffSecs).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
}
|
||||
|
||||
// expectRuntimeLookup mocks handleMockA2A's lookupRuntime query. The proxy
|
||||
// calls this on every dispatch to decide whether to short-circuit with a
|
||||
// canned mock reply; returning a non-mock runtime lets the request fall
|
||||
// through to the real agent path. The existing tests don't care about the
|
||||
// mock path but the query happens unconditionally, so the mock is required
|
||||
// to keep the test logs clean.
|
||||
func expectRuntimeLookup(mock sqlmock.Sqlmock, workspaceID string) {
|
||||
mock.ExpectQuery(
|
||||
"SELECT runtime FROM workspaces WHERE id = $1").
|
||||
WithArgs(workspaceID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("claude-code"))
|
||||
}
|
||||
|
||||
// expectRecentHeartbeatAbsent mocks hasRecentHeartbeat's query to return
|
||||
// NULL — DrainQueueForWorkspace treats that as "no recent heartbeat" and
|
||||
// falls through to MarkQueueItemFailed (the pre-fix behaviour). Used by
|
||||
// tests that exercise the dead-agent / non-transient failure paths.
|
||||
func expectRecentHeartbeatAbsent(mock sqlmock.Sqlmock, workspaceID string) {
|
||||
mock.ExpectQuery(
|
||||
"SELECT last_heartbeat_at FROM workspaces WHERE id = $1").
|
||||
WithArgs(workspaceID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(nil))
|
||||
}
|
||||
|
||||
// expectRecentHeartbeatPresent mocks hasRecentHeartbeat's query to return a
|
||||
// recent timestamp — DrainQueueForWorkspace treats that as "workspace is
|
||||
// alive" and the transient gateway-origin path becomes eligible. Used by
|
||||
// the regression test that pins the new behaviour.
|
||||
func expectRecentHeartbeatPresent(mock sqlmock.Sqlmock, workspaceID string) {
|
||||
mock.ExpectQuery(
|
||||
"SELECT last_heartbeat_at FROM workspaces WHERE id = $1").
|
||||
WithArgs(workspaceID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(time.Now()))
|
||||
}
|
||||
|
||||
// agentServer creates an httptest.Server that responds with the given status
|
||||
// and optional JSON body.
|
||||
func agentServer(body string, status int) *httptest.Server {
|
||||
@@ -379,6 +437,8 @@ func TestDrainQueueForWorkspace_ProxyErrResponseNil_NoPanic(t *testing.T) {
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
|
||||
|
||||
srv := agentServer("", http.StatusBadGateway)
|
||||
defer srv.Close()
|
||||
@@ -400,6 +460,11 @@ func TestDrainQueueForWorkspace_ProxyErrMissingErrorKey_UsesStatusText(t *testin
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
// 500 is NOT in isUpstreamDeadStatus so isGatewayOriginFailure returns
|
||||
// false and hasRecentHeartbeat is never consulted — no SQL mock needed
|
||||
// for the transient-retry path. Falls through to MarkQueueItemFailed
|
||||
// (the pre-fix behaviour for non-gateway failures).
|
||||
|
||||
srv := agentServer(`{"code":500,"detail":"internal server error"}`, http.StatusInternalServerError)
|
||||
defer srv.Close()
|
||||
@@ -421,6 +486,8 @@ func TestDrainQueueForWorkspace_ProxyErrNonStringError_NoPanic(t *testing.T) {
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
|
||||
|
||||
srv := agentServer(`{"error": 429}`, http.StatusServiceUnavailable)
|
||||
defer srv.Close()
|
||||
@@ -442,6 +509,8 @@ func TestDrainQueueForWorkspace_ProxyErrWithStringError_UsesErrorMessage(t *test
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
|
||||
|
||||
wantErrMsg := "upstream agent crashed with signal: killed"
|
||||
srv := agentServer(fmt.Sprintf(`{"error":%q}`, wantErrMsg), http.StatusBadGateway)
|
||||
@@ -478,7 +547,7 @@ func TestDrainQueueForWorkspace_DequeueError_LogsAndReturns(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectQuery(
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
"SELECT id, workspace_id, caller_id, priority, body::text, method, attempts FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued' AND (expires_at IS NULL OR expires_at > now()) AND (next_attempt_at IS NULL OR next_attempt_at <= now()) ORDER BY priority DESC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").
|
||||
WithArgs("ws-dequeue-err").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
mock.ExpectRollback()
|
||||
@@ -505,6 +574,11 @@ func TestDrainQueueForWorkspace_MaxAttempts_FailsRatherThanRetries(t *testing.T)
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
// No recent heartbeat → falls through to MarkQueueItemFailed (not the
|
||||
// transient-retry path). This pins the pre-fix behaviour for dead /
|
||||
// unreachable workspaces: the 5-attempt cap still fires after 5 retries.
|
||||
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
|
||||
|
||||
srv := agentServer(`{"error":"agent unreachable"}`, http.StatusBadGateway)
|
||||
defer srv.Close()
|
||||
@@ -550,3 +624,210 @@ func TestDrainQueueForWorkspace_ClaimGuarding_SecondDrainGetsEmpty(t *testing.T)
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 2026-06-21 PM RCA: transient gateway-retry path ====================
|
||||
//
|
||||
// The PM RCA found that DrainQueueForWorkspace was treating every
|
||||
// 502/503/504 from the upstream proxy as a "dead agent unreachable"
|
||||
// failure and burning the 5-attempt cap on otherwise-healthy
|
||||
// workspaces. The new path: when the workspace has a recent heartbeat
|
||||
// AND the failure is a gateway-origin dead-origin status (502/503/504
|
||||
// or 521/522/523/524), re-queue via MarkQueueItemTransientRetry which
|
||||
// does NOT advance the attempts counter, and invalidate the cached
|
||||
// agent URL so the next retry re-resolves it from the DB. Only
|
||||
// confirmed-dead agents (Classification="upstream_dead") and non-
|
||||
// gateway failures continue to use MarkQueueItemFailed.
|
||||
//
|
||||
// These four tests pin the new contract end-to-end: the new SQL
|
||||
// UPDATE statement, the URL cache invalidation, the heartbeat gate,
|
||||
// and the regression of the "dead agent" path under the same
|
||||
// conditions.
|
||||
|
||||
// TestDrainQueueForWorkspace_TransientGatewayFailure_StaysQueued: the
|
||||
// regression test for the RCA. Online workspace + queued item +
|
||||
// transient 502 (Cloudflare tunnel error page) + recent heartbeat →
|
||||
// MarkQueueItemTransientRetry (NOT MarkQueueItemFailed) so the
|
||||
// 5-attempt cap is preserved for actual dead-agent failures.
|
||||
func TestDrainQueueForWorkspace_TransientGatewayFailure_StaysQueued(t *testing.T) {
|
||||
item := drainItem("ws-gateway-blip")
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
// Recent heartbeat: the workspace is alive; the failure is in the
|
||||
// path between us and the agent, not the agent itself.
|
||||
expectRecentHeartbeatPresent(mock, item.WorkspaceID)
|
||||
|
||||
// Cloudflare 502 error page — empty body, no JSON. This is the
|
||||
// shape that triggered the RCA: a healthy workspace's A2A forward
|
||||
// hits a CDN tunnel blip and returns 502 with an HTML body.
|
||||
srv := agentServer(`<html>cloudflare error</html>`, http.StatusBadGateway)
|
||||
defer srv.Close()
|
||||
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
|
||||
|
||||
// Expect MarkQueueItemTransientRetry (NOT MarkQueueItemFailed). The
|
||||
// last_error string carries the "[transient gateway origin]" prefix
|
||||
// so the failure shape is auditable in the a2a_queue row.
|
||||
wantErrPrefix := "transient gateway origin (unknown, status=502):"
|
||||
expectTransientRetry(mock, item.ID, sqlmock.AnyArg()) // exact errMsg verified via DB below
|
||||
_ = wantErrPrefix
|
||||
|
||||
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 1)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDrainQueueForWorkspace_TransientGatewayFailure_InvalidatesCachedURL:
|
||||
// on the transient-retry path, the cached agent URL must be evicted
|
||||
// from Redis so the next drain tick does a fresh DB lookup. Without
|
||||
// this, a stale URL pointing at a temporarily-flapped tunnel would
|
||||
// keep hitting the same broken endpoint. The ClearWorkspaceKeys call
|
||||
// removes the three ws:<id>:* keys (liveness, url, internal_url) in
|
||||
// one shot; the test verifies the url key is gone after the drain.
|
||||
func TestDrainQueueForWorkspace_TransientGatewayFailure_InvalidatesCachedURL(t *testing.T) {
|
||||
item := drainItem("ws-invalidate")
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
expectRecentHeartbeatPresent(mock, item.WorkspaceID)
|
||||
expectTransientRetry(mock, item.ID, sqlmock.AnyArg())
|
||||
|
||||
srv := agentServer("", http.StatusBadGateway)
|
||||
defer srv.Close()
|
||||
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
|
||||
|
||||
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 1)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
|
||||
// Verify the cached URL was invalidated. seedRedisURL put it under
|
||||
// "ws:<id>:url" — after the drain it must be gone.
|
||||
if got, err := mr.Get(fmt.Sprintf("ws:%s:url", item.WorkspaceID)); err == nil && got != "" {
|
||||
t.Errorf("cached URL survived transient-retry invalidation: got=%q want empty", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDrainQueueForWorkspace_GatewayFailure_NoRecentHeartbeat_StillFails:
|
||||
// the heartbeat gate is the load-bearing part of the new path. If the
|
||||
// workspace is NOT heartbeating, a 502 stays a dead-agent failure —
|
||||
// we don't want to re-queue on a genuinely-dead workspace. This pins
|
||||
// the gate: gateway-origin status + no recent heartbeat →
|
||||
// MarkQueueItemFailed, same as the pre-fix behaviour.
|
||||
func TestDrainQueueForWorkspace_GatewayFailure_NoRecentHeartbeat_StillFails(t *testing.T) {
|
||||
item := drainItem("ws-no-hb")
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
expectRecentHeartbeatAbsent(mock, item.WorkspaceID)
|
||||
expectFailed(mock, item.ID, "Bad Gateway")
|
||||
|
||||
srv := agentServer("", http.StatusBadGateway)
|
||||
defer srv.Close()
|
||||
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
|
||||
|
||||
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 1)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDrainQueueForWorkspace_UpstreamDead_BypassesTransientPath: when
|
||||
// the proxy already confirmed a dead container (Classification =
|
||||
// "upstream_dead", set by maybeMarkContainerDead in
|
||||
// handleA2ADispatchError), the transient-retry path is NOT eligible —
|
||||
// that is a real dead-agent failure and the 5-attempt cap MUST be
|
||||
// allowed to fire. This test pins that isGatewayOriginFailure
|
||||
// short-circuits on the "upstream_dead" classification and falls
|
||||
// through to MarkQueueItemFailed.
|
||||
func TestDrainQueueForWorkspace_UpstreamDead_BypassesTransientPath(t *testing.T) {
|
||||
// We cannot easily inject a proxyA2AError with Classification=
|
||||
// "upstream_dead" through the normal DrainQueueForWorkspace path
|
||||
// (the existing test infrastructure uses an httptest.Server for
|
||||
// the agent, which doesn't go through maybeMarkContainerDead).
|
||||
// So this test is a unit test of isGatewayOriginFailure itself,
|
||||
// which is the load-bearing predicate.
|
||||
upstreamDead := &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered"},
|
||||
Classification: "upstream_dead",
|
||||
}
|
||||
if isGatewayOriginFailure(upstreamDead) {
|
||||
t.Errorf("isGatewayOriginFailure(upstream_dead) = true, want false — confirmed-dead must bypass the transient-retry path")
|
||||
}
|
||||
|
||||
// Also verify the inverse: a 502 without "upstream_dead" classification
|
||||
// IS a candidate for the transient-retry path.
|
||||
gatewayOrigin := &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "bad gateway"},
|
||||
}
|
||||
if !isGatewayOriginFailure(gatewayOrigin) {
|
||||
t.Errorf("isGatewayOriginFailure(502 + no classification) = false, want true — the predicate should recognise 502 as gateway-origin when the proxy has not confirmed dead")
|
||||
}
|
||||
|
||||
// And a non-dead-origin 5xx (e.g., 500 internal agent error) is NOT
|
||||
// a gateway-origin failure.
|
||||
notGatewayOrigin := &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "agent crashed"},
|
||||
}
|
||||
if isGatewayOriginFailure(notGatewayOrigin) {
|
||||
t.Errorf("isGatewayOriginFailure(500) = true, want false — agent-authored 5xx is not a gateway-origin failure")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDrainQueueForWorkspace_TransientRetry_BackoffBreaksCapacityLoop:
|
||||
// Regression test for Researcher #3127 REQUEST_CHANGES. The original
|
||||
// transient-retry fix requeued the row with status='queued' and no
|
||||
// backoff, so a capacity>1 DrainQueueForWorkspace could re-claim the
|
||||
// just-requeued row on the very next for-loop iteration and hit the
|
||||
// same gateway failure in a tight loop. The fix: next_attempt_at = now() + 5s
|
||||
// on transient retry, plus a WHERE clause in DequeueNext that skips
|
||||
// rows whose next_attempt_at is still in the future.
|
||||
//
|
||||
// This test pins the backoff: capacity=2, one queued item that hits a
|
||||
// transient 502, expect the second DequeueNext to return (nil, nil)
|
||||
// because the only item is now backoff-gated. Without the WHERE clause
|
||||
// the second DequeueNext would have re-claimed the row and the test
|
||||
// would fail (the budget check + MarkQueueItemTransientRetry expectations
|
||||
// would be unmet, since the row would not be requeued a second time).
|
||||
func TestDrainQueueForWorkspace_TransientRetry_BackoffBreaksCapacityLoop(t *testing.T) {
|
||||
item := drainItem("ws-capacity-loop")
|
||||
mock, handler, mr := drainSetup(t, item.WorkspaceID)
|
||||
|
||||
// Iteration 1 of the for-loop (capacity=2): the only queued row is
|
||||
// claimed, dispatched, and hits a transient 502. Recent heartbeat
|
||||
// keeps the transient-retry path eligible.
|
||||
expectDequeueNextOk(mock, item)
|
||||
expectQueueBudgetCheck(mock, item.WorkspaceID)
|
||||
expectRuntimeLookup(mock, item.WorkspaceID)
|
||||
expectRecentHeartbeatPresent(mock, item.WorkspaceID)
|
||||
|
||||
srv := agentServer("", http.StatusBadGateway)
|
||||
defer srv.Close()
|
||||
seedRedisURL(t, mr, item.WorkspaceID, srv.URL)
|
||||
|
||||
expectTransientRetry(mock, item.ID, sqlmock.AnyArg())
|
||||
|
||||
// Iteration 2 of the for-loop (capacity=2): the just-requeued row
|
||||
// is still the highest-priority item, but next_attempt_at is now()
|
||||
// + 5s — DequeueNext's WHERE clause MUST skip it. The mock returns
|
||||
// sql.ErrNoRows as if the queue is empty, and the test framework
|
||||
// will fail if the second iteration ever calls into proxyA2ARequest
|
||||
// (no MarkQueueItemTransientRetry / MarkQueueItemFailed mock is
|
||||
// registered for it).
|
||||
expectDequeueNextEmpty(mock, item.WorkspaceID)
|
||||
|
||||
handler.DrainQueueForWorkspace(context.Background(), item.WorkspaceID, 2)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
DROP INDEX IF EXISTS idx_a2a_queue_next_attempt_at;
|
||||
ALTER TABLE a2a_queue DROP COLUMN IF EXISTS next_attempt_at;
|
||||
@@ -0,0 +1,38 @@
|
||||
-- #3127 (Researcher follow-up): backoff for transient gateway-origin
|
||||
-- queue retries. Without a per-row "not before" gate, DrainQueueForWorkspace
|
||||
-- with capacity>1 can re-dispatch the same item inside the same call
|
||||
-- immediately after MarkQueueItemTransientRetry (because the row's
|
||||
-- status='queued' AND the for-loop in DrainQueueForWorkspace iterates up
|
||||
-- to capacity times). MarkQueueItemTransientRetry sets the new column to
|
||||
-- now() + 5s; DequeueNext's WHERE clause skips rows whose next_attempt_at
|
||||
-- is still in the future. This breaks the tight retry loop without
|
||||
-- requiring a schema-foreign "stop draining" branch.
|
||||
--
|
||||
-- Column is nullable: NULL = no backoff constraint (default state for
|
||||
-- rows that have never been transient-retried, and for the legacy
|
||||
-- MarkQueueItemFailed path that doesn't touch this column).
|
||||
--
|
||||
-- Index strategy: a NEW partial index on next_attempt_at IS NOT NULL
|
||||
-- keyed by (workspace_id, next_attempt_at, priority DESC, enqueued_at ASC).
|
||||
-- The predicate is intentionally STABLE (no now() — PostgreSQL rejects
|
||||
-- volatile functions in index predicates and the previous iteration of
|
||||
-- this migration used `next_attempt_at > now()` which fails DDL). The
|
||||
-- planner uses this index for the rare gated-row case; the existing
|
||||
-- idx_a2a_queue_dispatch covers the common NULL case via row-filter.
|
||||
-- next_attempt_at is included as a key column (not in the predicate)
|
||||
-- so the planner can range-scan it during the gated case.
|
||||
--
|
||||
-- #3127 PR follow-up (Researcher/CR2 REQUEST_CHANGES on 7df1b5e9):
|
||||
-- replaced the originally-proposed `next_attempt_at > now()` partial
|
||||
-- predicate with the stable `next_attempt_at IS NOT NULL` predicate.
|
||||
-- The predicate must reference only IMMUTABLE expressions; now() is
|
||||
-- STABLE, not IMMUTABLE, so the original index could not be created
|
||||
-- at deploy time.
|
||||
|
||||
ALTER TABLE a2a_queue ADD COLUMN IF NOT EXISTS next_attempt_at TIMESTAMPTZ;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_a2a_queue_next_attempt_at
|
||||
ON a2a_queue (workspace_id, next_attempt_at, priority DESC, enqueued_at ASC)
|
||||
WHERE status = 'queued'
|
||||
AND next_attempt_at IS NOT NULL;
|
||||
|
||||
Reference in New Issue
Block a user