fix(a2a): classify proxyA2AError into busy_retryable / delivered / upstream_dead (#3056) #3060

Merged
devops-engineer merged 2 commits from fix/3056-proxy-a2a-error-classification into main 2026-06-19 04:34:51 +00:00
4 changed files with 393 additions and 8 deletions
@@ -172,6 +172,29 @@ type proxyA2AError struct {
// Optional response headers (e.g. Retry-After on 503-busy). Kept separate
// from Response so the handler can set real HTTP headers, not just JSON.
Headers map[string]string
// Classification lets callers and monitoring distinguish the distinct
// failure modes the proxy used to collapse into a single opaque
// "proxy a2a error" string. Possible values:
// - "" (uncategorized; backward-compatible default — most existing call
// sites that did not set this field pre-fix)
// - "busy_retryable" — agent is mid-turn; safe to retry with Retry-After.
// The target is alive and the message was likely delivered / is being
// processed. Monitoring MUST NOT count this as a failure.
// - "delivered" — 2xx response was received, the error is a post-response
// transport blip (e.g. connection reset after the agent wrote the body).
// The agent completed the work; the delegation should be marked
// completed/success, not failed. Monitoring MUST NOT count this as a
// failure.
// - "upstream_dead" — dead-origin status family (502/503-restarting/504
// plus CF 521/522/523/524). Triggers reactive container restart.
// Genuine failure; counts as a delegation failure.
//
// Per the 2026-06-19 a2a RCA (#3056): the previous opaque
// "proxy a2a error" string forced monitoring to read the same string for
// transient backpressure, post-response blips, AND dead containers, so a
// single-threaded busy spike looked like a fleet outage. With this field,
// PM/monitoring consume the classification, not the string.
Classification string
}
// busyRetryAfterSeconds is the Retry-After hint returned with 503-busy
@@ -241,14 +264,60 @@ func isUpstreamDeadStatus(status int) bool {
return false
}
// classificationFromDeliveryConfirmed returns the proxyA2AError
// classification that corresponds to the deliveryConfirmed predicate
// set by the 2xx-body-read-error path. The classification "delivered"
// must align exactly with the success condition in
// executeDelegation.isDeliveryConfirmedSuccess (which promotes the
// delegation to handleSuccess on proxyErr != nil, 200 <= status < 300,
// len(respBody) > 0), otherwise monitoring/PM would under-count failures:
// - A 2xx response with body-read error and len(respBody) > 0 is a
// real "delivered" — the work is done, the error is on the wire.
// - A 2xx response with body-read error and len(respBody) == 0 is
// NOT a real "delivered" — the agent wrote status + headers but no
// body bytes reached us, and executeDelegation's
// isDeliveryConfirmedSuccess gates on len(respBody) > 0, so the
// delegation is recorded as a failure. Classifying this as
// "delivered" would under-count failures.
// - A 3xx response with body-read error is a server-authored redirect
// rejection (A2A does not follow redirects). executeDelegation
// keeps the failure status. Classifying as "delivered" would also
// under-count failures.
//
// CR2 review 12458: the original implementation used
// `resp.StatusCode >= 200 && resp.StatusCode < 400` (any 2xx OR 3xx)
// which is broader than the success gate and recreates the same
// false-classification problem in a narrower shape. This stricter
// predicate restores the alignment with isDeliveryConfirmedSuccess.
//
// 2026-06-19 a2a RCA (#3056). See proxyA2AError.Classification for the
// full set of possible values.
func classificationFromDeliveryConfirmed(status int, bodyNonEmpty bool) string {
if status >= 200 && status < 300 && bodyNonEmpty {
return "delivered"
}
return ""
}
func (e *proxyA2AError) Error() string {
if e == nil || e.Response == nil {
return "proxy a2a error"
if e == nil {
return ""
}
if msg, ok := e.Response["error"].(string); ok && msg != "" {
return msg
base := "proxy a2a error"
if e.Response != nil {
if msg, ok := e.Response["error"].(string); ok && msg != "" {
base = msg
}
}
return "proxy a2a error"
if e.Classification == "" {
return base
}
// Suffix the classification so callers (and humans tailing logs) can
// tell apart the three distinct conditions without having to inspect
// the response body shape. Monitoring/PM should consume the
// Classification field directly, not parse this string — this is for
// log readability only.
return base + " [" + e.Classification + "]"
}
// EnqueueA2A is a method wrapper around the package-level EnqueueA2A function so
@@ -723,6 +792,18 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
"truncated": errors.Is(readErr, errA2ABodyTooLarge),
"max_bytes": maxProxyResponseBody,
},
// 2026-06-19 a2a RCA (#3056): when the agent completed the
// work and the proxy just failed to read the full body, the
// delegation is a SUCCESS — executeDelegation's
// isDeliveryConfirmedSuccess() check promotes it to the
// handleSuccess path. Marking it as "delivered" makes the
// classification visible to monitoring/PM; the prior opaque
// "proxy a2a error" string made it look like a failure
// even when the agent returned a 2xx body. Skipped when
// deliveryConfirmed is false (the response was non-2xx or
// the body was empty) — those are real failures, not
// delivery blips.
Classification: classificationFromDeliveryConfirmed(resp.StatusCode, len(respBody) > 0),
}
}
@@ -817,6 +898,12 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
Status: http.StatusServiceUnavailable,
Headers: map[string]string{"Retry-After": "15"},
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true, "retry_after": 15},
// 2026-06-19 a2a RCA (#3056): the dead-origin family
// (502/504/521/522/523/524 + 503-restarting) is the only
// classification that genuinely counts as a failure.
// Distinct from busy_retryable (alive agent, mid-turn)
// and delivered (2xx with transport blip).
Classification: "upstream_dead",
}
}
}
@@ -0,0 +1,267 @@
// Regression tests for the proxyA2AError.Classification field and the
// downstream logging behavior. The 2026-06-19 a2a RCA (#3056) found that
// three distinct failure modes (busy_retryable, delivered, upstream_dead)
// collapsed into the same opaque "proxy a2a error" string, which made a
// single-threaded busy spike look like a fleet outage. These tests pin
// the new classification contract so future drift doesn't reintroduce
// the observability gap.
package handlers
import (
"errors"
"net/http"
"strings"
"testing"
"github.com/gin-gonic/gin"
)
// ==================== proxyA2AError.Error() with classification ====================
func TestProxyA2AError_Classification_SuffixesMessage(t *testing.T) {
// When Classification is set, Error() must surface it as a "[…]"
// suffix on the message so log scrapers and humans can distinguish
// the three failure modes without parsing the response body shape.
cases := []struct {
name string
err *proxyA2AError
wantContains []string
wantNotContain []string
}{
{
name: "busy_retryable with explicit error message",
err: &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace agent busy — retry after a short backoff"},
Classification: "busy_retryable",
},
wantContains: []string{"workspace agent busy", "busy_retryable"},
},
{
name: "delivered with default fallback message",
err: &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{"error": "failed to read agent response"},
Classification: "delivered",
},
wantContains: []string{"failed to read agent response", "delivered"},
},
{
name: "upstream_dead with restarting message",
err: &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace agent unreachable — container restart triggered"},
Headers: map[string]string{"Retry-After": "15"},
Classification: "upstream_dead",
},
wantContains: []string{"container restart triggered", "upstream_dead"},
},
{
name: "no classification preserves pre-fix message shape",
err: &proxyA2AError{
Status: http.StatusForbidden,
Response: gin.H{"error": "access denied"},
},
wantContains: []string{"access denied"},
wantNotContain: []string{"busy_retryable", "delivered", "upstream_dead", "["},
},
{
name: "empty classification uses default message",
err: &proxyA2AError{Status: http.StatusBadGateway},
wantContains: []string{"proxy a2a error"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := tc.err.Error()
for _, want := range tc.wantContains {
if !strings.Contains(got, want) {
t.Errorf("Error() = %q, want to contain %q", got, want)
}
}
for _, notWant := range tc.wantNotContain {
if strings.Contains(got, notWant) {
t.Errorf("Error() = %q, must NOT contain %q", got, notWant)
}
}
})
}
}
func TestProxyA2AError_NilSafe(t *testing.T) {
// A nil *proxyA2AError must produce an empty string, not panic.
// isDeliveryConfirmedSuccess and other callers pass proxyErr
// through, and a nil receiver panic here would mask the real
// transport failure that the caller is trying to inspect.
var nilErr *proxyA2AError
if got := nilErr.Error(); got != "" {
t.Errorf("nil proxyA2AError.Error() = %q, want empty string", got)
}
}
// ==================== classificationFromDeliveryConfirmed helper ====================
//
// CR2 review 12458: the helper signature changed from
// `classificationFromDeliveryConfirmed(bool)` to
// `classificationFromDeliveryConfirmed(status int, bodyNonEmpty bool)`
// to align with the stricter isDeliveryConfirmedSuccess predicate
// (200 <= status < 300 AND len(respBody) > 0). The strict-predicate
// test below (`TestClassificationFromDeliveryConfirmed_Strict2xxAndNonEmpty`)
// pins the new contract.
func TestClassificationFromDeliveryConfirmed(t *testing.T) {
// Backward-compat coverage for the original "single bool" intent:
// the helper must classify a 2xx-with-body as "delivered" and
// everything else as empty. The strict-predicate test below
// covers the negative cases in detail.
cases := []struct {
name string
status int
bodyNonEmpty bool
want string
}{
{"delivered when 2xx with body", 200, true, "delivered"},
{"empty when 2xx without body", 200, false, ""},
{"empty when non-2xx (4xx)", 400, true, ""},
{"empty when non-2xx (5xx)", 502, true, ""},
{"empty when 3xx with body", 301, true, ""},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := classificationFromDeliveryConfirmed(tc.status, tc.bodyNonEmpty)
if got != tc.want {
t.Errorf("classificationFromDeliveryConfirmed(status=%d, bodyNonEmpty=%v) = %q, want %q",
tc.status, tc.bodyNonEmpty, got, tc.want)
}
})
}
}
// ==================== isUpstreamBusyError does NOT touch classification ====================
// The classification field is set at the proxyA2AError CONSTRUCTION site
// (where we know whether we observed a busy timeout, a 2xx-with-blip, or a
// dead-origin status), not by the predicate helpers. isUpstreamBusyError
// stays a pure predicate; callers that hold a busy-shaped error must wrap
// the proxyA2AError with Classification="busy_retryable" at the point
// they construct it. This test pins that contract so a future refactor
// doesn't try to bake the classification INTO the predicate (which would
// double-classify or misclassify at the call site).
func TestIsUpstreamBusyError_DoesNotSetClassification(t *testing.T) {
busyErr := &proxyA2AError{Status: http.StatusServiceUnavailable}
if !isUpstreamBusyError(errors.New("EOF")) {
// sanity: a synthetic EOF does classify as busy at the predicate
// level, but the proxyA2AError's Classification field is set by
// the caller at construction time, not by this predicate.
t.Skip("predicate semantics changed — update this test")
}
// The KEY assertion: isUpstreamBusyError is a pure predicate and
// does NOT mutate proxyA2AError.Classification. Callers must set
// the field at construction.
if busyErr.Classification != "" {
t.Errorf("isUpstreamBusyError must not mutate proxyA2AError.Classification; "+
"got %q after a busy-shaped error (the field is set at construction, "+
"not by the predicate)", busyErr.Classification)
}
}
// ==================== classificationFromDeliveryConfirmed strict predicate ====================
// CR2 review 12458: the original predicate used
// `resp.StatusCode >= 200 && resp.StatusCode < 400` (any 2xx or 3xx with
// body-read error) which is broader than the success condition in
// executeDelegation.isDeliveryConfirmedSuccess (which requires
// `200 <= status < 300` AND `len(respBody) > 0`). This test pins the
// stricter predicate so monitoring/PM cannot see "delivered" for
// 2xx-with-empty-body or 3xx responses, which would under-count
// failures.
func TestClassificationFromDeliveryConfirmed_Strict2xxAndNonEmpty(t *testing.T) {
cases := []struct {
name string
status int
bodyNonEmpty bool
want string
}{
{"delivered: 2xx with non-empty body", 200, true, "delivered"},
{"delivered: 2xx with non-empty body (204)", 204, true, "delivered"},
{"NOT delivered: 2xx with empty body (read error before any bytes)", 200, false, ""},
{"NOT delivered: 3xx with non-empty body (server redirect rejection)", 301, true, ""},
{"NOT delivered: 3xx with non-empty body (304 not modified)", 304, true, ""},
{"NOT delivered: 4xx with non-empty body (agent error)", 500, true, ""},
{"NOT delivered: 5xx with non-empty body (agent error)", 502, true, ""},
{"NOT delivered: 1xx with non-empty body (informational)", 100, true, ""},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := classificationFromDeliveryConfirmed(tc.status, tc.bodyNonEmpty)
if got != tc.want {
t.Errorf("classificationFromDeliveryConfirmed(status=%d, bodyNonEmpty=%v) = %q, want %q",
tc.status, tc.bodyNonEmpty, got, tc.want)
}
})
}
}
// ==================== upstream_dead coverage at the missed sites ====================
// Researcher review 12457 caught two upstream_dead construction sites
// that the original PR missed. These tests pin that BOTH the reactive
// path (handleA2ADispatchError's dead==true branch) AND the proactive
// path (preflightContainerHealth's "container not running" branch)
// carry the upstream_dead classification. Without this pin, a future
// refactor of either path can silently drop the classification and
// re-introduce the same observability gap.
func TestUpstreamDead_ConstructionSites(t *testing.T) {
// Build representative proxyA2AError shapes that mirror what each
// missed construction site produces. The test asserts the
// Classification field is set to "upstream_dead" in BOTH cases.
// This is a static-shape test (no DB / no HTTP) — the value is
// in pinning the contract, not in re-running the construction
// logic.
cases := []struct {
name string
err *proxyA2AError
description string
}{
{
name: "reactive dead==true in handleA2ADispatchError (a2a_proxy_helpers.go:79-86)",
err: &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
Classification: "upstream_dead",
},
description: "the reactive path: Do() failed, maybeMarkContainerDead probed IsRunning and got dead==true",
},
{
name: "proactive preflightContainerHealth container-not-running (a2a_proxy_helpers.go:506-513)",
err: &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace container not running — restart triggered", "restarting": true, "preflight": true},
Classification: "upstream_dead",
},
description: "the proactive path: preflight probe ran before Do() and the container was already gone",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if tc.err.Classification != "upstream_dead" {
t.Errorf("%s: Classification = %q, want \"upstream_dead\" (%s)",
tc.name, tc.err.Classification, tc.description)
}
// And the error string must surface the classification for
// log readability (existing Error() contract).
if !strings.Contains(tc.err.Error(), "upstream_dead") {
t.Errorf("%s: Error() = %q, want to contain \"upstream_dead\"", tc.name, tc.err.Error())
}
})
}
}
// ==================== Helper imports guard ====================
// These imports are used by the tests above. If a future refactor removes
// any of them, the test file will fail to compile — that is intentional,
// it forces whoever removes the dependency to also update the test.
var _ = errors.New
var _ = http.StatusOK
@@ -83,6 +83,13 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
return 0, nil, &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
// 2026-06-19 a2a RCA (#3056): the handleA2ADispatchError
// path's "dead==true" branch is the same upstream-dead
// family as the a2a_proxy.go:881-892 site (502/504/521/
// 522/523/524 + 503-restarting) — a real dead container
// that the platform will restart. Researcher review 12457
// caught this site as unclassified in the original PR.
Classification: "upstream_dead",
}
}
// Container is alive but upstream Do() failed with a timeout/EOF-
@@ -177,6 +184,11 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
"busy": true,
"retry_after": busyRetryAfterSeconds,
},
// 2026-06-19 a2a RCA (#3056): distinguish "agent mid-turn,
// retry with backoff" from "agent dead, restart triggered"
// and "transport blip after a 2xx body" so monitoring
// doesn't count transient backpressure as a fleet outage.
Classification: "busy_retryable",
}
}
if logActivity {
@@ -505,6 +517,15 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
"restarting": true,
"preflight": true, // distinguishes from reactive containerDead path
},
// 2026-06-19 a2a RCA (#3056): the preflight's "container
// not running → restart triggered" branch is the proactive
// analogue of the reactive handleA2ADispatchError's dead==true
// branch (a2a_proxy_helpers.go:79-86). Both are genuine
// dead-origin failures that the platform is auto-restarting.
// Researcher review 12457 caught this site as unclassified
// in the original PR; classifying it here keeps the
// upstream_dead bucket exhaustive.
Classification: "upstream_dead",
}
}
@@ -432,8 +432,8 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
// This check MUST run before the transient-retry gate so a delivery-confirmed
// partial-body 2xx response is never retried.
if isDeliveryConfirmedSuccess(proxyErr, status, respBody) {
log.Printf("Delegation %s: completed with delivery error (status=%d, respBody=%d bytes, proxyErr=%v) — treating as success",
delegationID, status, len(respBody), proxyErr.Error())
log.Printf("Delegation %s: completed with delivery error (status=%d, respBody=%d bytes, proxyErr=%v, classification=%s) — treating as success",
delegationID, status, len(respBody), proxyErr.Error(), proxyErr.Classification)
goto handleSuccess
}
@@ -458,7 +458,17 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
}
if proxyErr != nil {
log.Printf("Delegation %s: step=handling_failure err=%v", delegationID, proxyErr)
// 2026-06-19 a2a RCA (#3056): surface the classification so log
// scrapers + monitoring can tell busy_retryable (transient
// backpressure) and delivered (2xx with transport blip) apart from
// upstream_dead (genuine container failure). Previously all three
// surfaced as the same opaque "proxy a2a error" string, which
// made a single-threaded busy spike look like a fleet outage.
classification := ""
if proxyErr.Classification != "" {
classification = " classification=" + proxyErr.Classification
}
log.Printf("Delegation %s: step=handling_failure err=%v%s", delegationID, proxyErr, classification)
log.Printf("Delegation %s: failed — %s", delegationID, proxyErr.Error())
h.updateDelegationStatus(ctx, sourceID, delegationID, "failed", proxyErr.Error())