forked from molecule-ai/molecule-core
fix(a2a): surface delivery_confirmed + prevent 503-busy double-delivery (#689)
Two targeted fixes for the A2A false-negative (delivery succeeded but caller
receives A2A_ERROR):
Body-read failure: when Do() succeeds (target sent 2xx headers — delivery
confirmed) but io.ReadAll(resp.Body) fails, proxy now returns
{"delivery_confirmed": true} in the 502 body and logs the activity as
successful. Audit trail records true delivery, not a false failed entry.
isTransientProxyError fix: delegation retry loop now only retries 503s with
{restarting: true} (container died, message NOT delivered). 503 {busy: true}
signals the agent IS processing the delivered message — retrying causes
double-delivery. Fix prevents the double-delivery race.
All 16 packages pass: go test ./...
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
572b314c4e
commit
108d257833
@ -275,11 +275,27 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read agent response (capped at 10MB)
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if err != nil {
|
||||
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if readErr != nil {
|
||||
// Do() succeeded, which means the target received the request and sent
|
||||
// back response headers — delivery is confirmed. The body couldn't be
|
||||
// fully read (connection drop, timeout mid-stream). Surface
|
||||
// delivery_confirmed so callers can distinguish "not delivered" from
|
||||
// "delivered, but response body lost" (#689). When delivery is confirmed,
|
||||
// log the activity as successful (delivery happened) rather than leaving
|
||||
// a false "failed" entry in the audit trail.
|
||||
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
||||
if logActivity && deliveryConfirmed {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to read agent response"},
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{
|
||||
"error": "failed to read agent response",
|
||||
"delivery_confirmed": deliveryConfirmed,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -603,6 +603,83 @@ func TestProxyA2AError_BusyShape(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== ProxyA2A — body-read failure (delivery_confirmed) #689 ====================
|
||||
//
|
||||
// When Do() succeeds (target sent 2xx headers — delivery confirmed) but reading
|
||||
// the response body fails (connection drop, mid-stream timeout), the proxy must:
|
||||
// 1. Return 502 (caller can't get the response content)
|
||||
// 2. Include "delivery_confirmed": true in the error body so callers can
|
||||
// distinguish "not delivered" from "delivered, response body lost".
|
||||
|
||||
func TestProxyA2A_BodyReadFailure_DeliveryConfirmed(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Agent server: sends 200 OK headers + partial body, then closes the
|
||||
// connection abruptly to simulate a mid-stream read failure.
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Flush 200 headers immediately so Do() returns (resp, nil).
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
// Write partial JSON — just enough to prove the body was started,
|
||||
// then hijack and close the connection so ReadAll fails.
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
io.WriteString(w, `{"result": "partial`) //nolint:errcheck
|
||||
flusher.Flush()
|
||||
}
|
||||
// Hijack the underlying TCP connection and close it to simulate
|
||||
// a mid-stream drop that causes io.ReadAll to return an error.
|
||||
if hj, ok := w.(http.Hijacker); ok {
|
||||
conn, _, _ := hj.Hijack()
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
wsID := "ws-bodyreadfail"
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL)
|
||||
|
||||
// Expect async activity log INSERT (logA2ASuccess is called because
|
||||
// delivery_confirmed is true and the handler detected a 2xx status).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"ping"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Expect 502 (couldn't deliver the response content to the caller)
|
||||
if w.Code != http.StatusBadGateway {
|
||||
t.Errorf("expected 502, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("body not JSON: %v", err)
|
||||
}
|
||||
// delivery_confirmed must be true — Do() returned 2xx headers.
|
||||
if v, _ := resp["delivery_confirmed"].(bool); !v {
|
||||
t.Errorf(`expected "delivery_confirmed": true in response, got: %v`, resp)
|
||||
}
|
||||
if _, hasErr := resp["error"]; !hasErr {
|
||||
t.Errorf(`expected "error" field in response body`)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== validateCallerToken — Phase 30.5 ====================
|
||||
|
||||
// The A2A proxy validates the *caller's* token (not the target's) when the
|
||||
|
||||
@ -486,22 +486,34 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) {
|
||||
|
||||
// --- helpers ---
|
||||
|
||||
// isTransientProxyError returns true when the proxy error looks like a
|
||||
// restart-race condition worth retrying (connection refused, EOF, stale
|
||||
// URL pointing at a dead ephemeral port, container-restart-triggered
|
||||
// 503). Static 4xx errors (bad request, access denied, not found) are
|
||||
// NOT retried — retrying them wastes the 8-second delay for no benefit.
|
||||
// isTransientProxyError returns true when the proxy error is a restart-race
|
||||
// condition worth retrying (connection refused, stale ephemeral-port URL after
|
||||
// a container restart). Static 4xx and generic 5xx errors are NOT retried.
|
||||
//
|
||||
// 503 requires careful splitting (#689): the proxy emits two distinct 503 shapes
|
||||
// that must be handled differently:
|
||||
// - "restarting: true" — container was dead; restart triggered. The POST body
|
||||
// was never delivered (dead container can't accept TCP). Safe to retry.
|
||||
// - "busy: true" — agent is alive, mid-synthesis on a previous request. The
|
||||
// POST body WAS likely delivered. Retrying double-delivers the message.
|
||||
// Do NOT retry; surface the 503 to the caller instead.
|
||||
func isTransientProxyError(err *proxyA2AError) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
// 503 is the explicit "container unreachable / restart triggered"
|
||||
// response from a2a_proxy.go after its reactive health check.
|
||||
// 502 is "failed to reach workspace agent" — the pre-reactive-check
|
||||
// error for plain connection failures.
|
||||
if err.Status == http.StatusServiceUnavailable || err.Status == http.StatusBadGateway {
|
||||
// 502 = "failed to reach workspace agent" (connection refused / DNS failure).
|
||||
// The message was NOT delivered. Safe to retry after reactive URL refresh (#74).
|
||||
if err.Status == http.StatusBadGateway {
|
||||
return true
|
||||
}
|
||||
// 503 with restarting:true = container died → message not delivered → retry.
|
||||
// 503 with busy:true (or no flag) = agent alive → message may be delivered → no retry.
|
||||
if err.Status == http.StatusServiceUnavailable {
|
||||
if restart, ok := err.Response["restarting"].(bool); ok && restart {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@ -344,9 +344,19 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) {
|
||||
expect bool
|
||||
}{
|
||||
{"nil", nil, false},
|
||||
{"503 service unavailable (container restart triggered)",
|
||||
&proxyA2AError{Status: http.StatusServiceUnavailable}, true},
|
||||
{"502 bad gateway (connection refused)",
|
||||
// 503 with restarting:true — container was dead; restart triggered.
|
||||
// Message was NOT delivered (dead container). Safe to retry (#74).
|
||||
{"503 container restart triggered — retry",
|
||||
&proxyA2AError{Status: http.StatusServiceUnavailable, Response: gin.H{"restarting": true}}, true},
|
||||
// 503 with busy:true — agent is alive, mid-synthesis on the delivered
|
||||
// message. Retrying would double-deliver (#689). Must NOT retry.
|
||||
{"503 agent busy (double-delivery risk) — no retry",
|
||||
&proxyA2AError{Status: http.StatusServiceUnavailable, Response: gin.H{"busy": true, "retry_after": 30}}, false},
|
||||
// 503 with no qualifying flag — conservative: don't retry.
|
||||
{"503 plain (no restarting flag) — no retry",
|
||||
&proxyA2AError{Status: http.StatusServiceUnavailable}, false},
|
||||
// 502 = connection refused = message not delivered → safe to retry.
|
||||
{"502 bad gateway (connection refused) — retry",
|
||||
&proxyA2AError{Status: http.StatusBadGateway}, true},
|
||||
{"404 workspace not found",
|
||||
&proxyA2AError{Status: http.StatusNotFound}, false},
|
||||
|
||||
Loading…
Reference in New Issue
Block a user