diff --git a/platform/internal/handlers/a2a_proxy.go b/platform/internal/handlers/a2a_proxy.go index f7664b22..99e91478 100644 --- a/platform/internal/handlers/a2a_proxy.go +++ b/platform/internal/handlers/a2a_proxy.go @@ -275,11 +275,27 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri defer resp.Body.Close() // Read agent response (capped at 10MB) - respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody)) - if err != nil { + respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody)) + if readErr != nil { + // Do() succeeded, which means the target received the request and sent + // back response headers — delivery is confirmed. The body couldn't be + // fully read (connection drop, timeout mid-stream). Surface + // delivery_confirmed so callers can distinguish "not delivered" from + // "delivered, but response body lost" (#689). When delivery is confirmed, + // log the activity as successful (delivery happened) rather than leaving + // a false "failed" entry in the audit trail. + deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400 + log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", + workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) + if logActivity && deliveryConfirmed { + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + } return 0, nil, &proxyA2AError{ - Status: http.StatusBadGateway, - Response: gin.H{"error": "failed to read agent response"}, + Status: http.StatusBadGateway, + Response: gin.H{ + "error": "failed to read agent response", + "delivery_confirmed": deliveryConfirmed, + }, } } diff --git a/platform/internal/handlers/a2a_proxy_test.go b/platform/internal/handlers/a2a_proxy_test.go index 7de89c31..7d731d76 100644 --- a/platform/internal/handlers/a2a_proxy_test.go +++ b/platform/internal/handlers/a2a_proxy_test.go @@ -603,6 +603,83 @@ func TestProxyA2AError_BusyShape(t *testing.T) { } } +// ==================== ProxyA2A — body-read failure (delivery_confirmed) #689 ==================== +// +// When Do() succeeds (target sent 2xx headers — delivery confirmed) but reading +// the response body fails (connection drop, mid-stream timeout), the proxy must: +// 1. Return 502 (caller can't get the response content) +// 2. Include "delivery_confirmed": true in the error body so callers can +// distinguish "not delivered" from "delivered, response body lost". + +func TestProxyA2A_BodyReadFailure_DeliveryConfirmed(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + // Agent server: sends 200 OK headers + partial body, then closes the + // connection abruptly to simulate a mid-stream read failure. + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Flush 200 headers immediately so Do() returns (resp, nil). + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + // Write partial JSON — just enough to prove the body was started, + // then hijack and close the connection so ReadAll fails. + if flusher, ok := w.(http.Flusher); ok { + io.WriteString(w, `{"result": "partial`) //nolint:errcheck + flusher.Flush() + } + // Hijack the underlying TCP connection and close it to simulate + // a mid-stream drop that causes io.ReadAll to return an error. + if hj, ok := w.(http.Hijacker); ok { + conn, _, _ := hj.Hijack() + if conn != nil { + conn.Close() + } + } + })) + defer agentServer.Close() + + wsID := "ws-bodyreadfail" + mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL) + + // Expect async activity log INSERT (logA2ASuccess is called because + // delivery_confirmed is true and the handler detected a 2xx status). + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"ping"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + time.Sleep(50 * time.Millisecond) + + // Expect 502 (couldn't deliver the response content to the caller) + if w.Code != http.StatusBadGateway { + t.Errorf("expected 502, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("body not JSON: %v", err) + } + // delivery_confirmed must be true — Do() returned 2xx headers. + if v, _ := resp["delivery_confirmed"].(bool); !v { + t.Errorf(`expected "delivery_confirmed": true in response, got: %v`, resp) + } + if _, hasErr := resp["error"]; !hasErr { + t.Errorf(`expected "error" field in response body`) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + // ==================== validateCallerToken — Phase 30.5 ==================== // The A2A proxy validates the *caller's* token (not the target's) when the diff --git a/platform/internal/handlers/delegation.go b/platform/internal/handlers/delegation.go index 89fd2220..9ca07107 100644 --- a/platform/internal/handlers/delegation.go +++ b/platform/internal/handlers/delegation.go @@ -486,22 +486,34 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) { // --- helpers --- -// isTransientProxyError returns true when the proxy error looks like a -// restart-race condition worth retrying (connection refused, EOF, stale -// URL pointing at a dead ephemeral port, container-restart-triggered -// 503). Static 4xx errors (bad request, access denied, not found) are -// NOT retried — retrying them wastes the 8-second delay for no benefit. +// isTransientProxyError returns true when the proxy error is a restart-race +// condition worth retrying (connection refused, stale ephemeral-port URL after +// a container restart). Static 4xx and generic 5xx errors are NOT retried. +// +// 503 requires careful splitting (#689): the proxy emits two distinct 503 shapes +// that must be handled differently: +// - "restarting: true" — container was dead; restart triggered. The POST body +// was never delivered (dead container can't accept TCP). Safe to retry. +// - "busy: true" — agent is alive, mid-synthesis on a previous request. The +// POST body WAS likely delivered. Retrying double-delivers the message. +// Do NOT retry; surface the 503 to the caller instead. func isTransientProxyError(err *proxyA2AError) bool { if err == nil { return false } - // 503 is the explicit "container unreachable / restart triggered" - // response from a2a_proxy.go after its reactive health check. - // 502 is "failed to reach workspace agent" — the pre-reactive-check - // error for plain connection failures. - if err.Status == http.StatusServiceUnavailable || err.Status == http.StatusBadGateway { + // 502 = "failed to reach workspace agent" (connection refused / DNS failure). + // The message was NOT delivered. Safe to retry after reactive URL refresh (#74). + if err.Status == http.StatusBadGateway { return true } + // 503 with restarting:true = container died → message not delivered → retry. + // 503 with busy:true (or no flag) = agent alive → message may be delivered → no retry. + if err.Status == http.StatusServiceUnavailable { + if restart, ok := err.Response["restarting"].(bool); ok && restart { + return true + } + return false + } return false } diff --git a/platform/internal/handlers/delegation_test.go b/platform/internal/handlers/delegation_test.go index 094b419b..caa5118d 100644 --- a/platform/internal/handlers/delegation_test.go +++ b/platform/internal/handlers/delegation_test.go @@ -344,9 +344,19 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) { expect bool }{ {"nil", nil, false}, - {"503 service unavailable (container restart triggered)", - &proxyA2AError{Status: http.StatusServiceUnavailable}, true}, - {"502 bad gateway (connection refused)", + // 503 with restarting:true — container was dead; restart triggered. + // Message was NOT delivered (dead container). Safe to retry (#74). + {"503 container restart triggered — retry", + &proxyA2AError{Status: http.StatusServiceUnavailable, Response: gin.H{"restarting": true}}, true}, + // 503 with busy:true — agent is alive, mid-synthesis on the delivered + // message. Retrying would double-deliver (#689). Must NOT retry. + {"503 agent busy (double-delivery risk) — no retry", + &proxyA2AError{Status: http.StatusServiceUnavailable, Response: gin.H{"busy": true, "retry_after": 30}}, false}, + // 503 with no qualifying flag — conservative: don't retry. + {"503 plain (no restarting flag) — no retry", + &proxyA2AError{Status: http.StatusServiceUnavailable}, false}, + // 502 = connection refused = message not delivered → safe to retry. + {"502 bad gateway (connection refused) — retry", &proxyA2AError{Status: http.StatusBadGateway}, true}, {"404 workspace not found", &proxyA2AError{Status: http.StatusNotFound}, false},