fix(a2a): detect and fail loud on proxy body truncation (core#2677) #2681

Merged
devops-engineer merged 4 commits from fix/a2a-proxy-body-truncation-2677 into main 2026-06-13 00:39:26 +00:00
2 changed files with 222 additions and 15 deletions
+52 -15
View File
@@ -11,6 +11,7 @@ import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
@@ -73,8 +74,8 @@ func setPlatformInDockerForTest(v bool) func() {
return func() { platformInDocker = prev }
}
// maxProxyRequestBody is the maximum size of an A2A proxy request body (1MB).
const maxProxyRequestBody = 1 << 20
// maxProxyRequestBody is the maximum size of an A2A proxy request body (16MB).
const maxProxyRequestBody = 16 << 20
// systemCallerPrefixes are caller IDs that bypass workspace access control.
// These are non-workspace internal callers (webhooks, system services, tests).
@@ -90,8 +91,28 @@ func isSystemCaller(callerID string) bool {
return false
}
// maxProxyResponseBody is the maximum size of an A2A proxy response body (10MB).
const maxProxyResponseBody = 10 << 20
// maxProxyResponseBody is the maximum size of an A2A proxy response body (64MB).
const maxProxyResponseBody = 64 << 20
// errA2ABodyTooLarge is returned by readBodyWithLimit when a body exceeds the
// configured limit. Callers surface it as a loud 413 / truncated proxy error
// instead of silently cutting the payload.
var errA2ABodyTooLarge = errors.New("A2A body exceeds size limit")
// readBodyWithLimit reads up to limit bytes from r. It returns an error
// (wrapping errA2ABodyTooLarge) when the input is larger than limit so the
// caller can fail loud instead of silently truncating. The returned body is
// capped at limit bytes; on truncation it contains the first limit bytes read.
func readBodyWithLimit(r io.Reader, limit int, kind string) ([]byte, error) {
body, err := io.ReadAll(io.LimitReader(r, int64(limit)+1))
if err != nil {
return body, err
}
if len(body) > limit {
return body[:limit], fmt.Errorf("%s body exceeds %d byte limit: %w", kind, limit, errA2ABodyTooLarge)
}
return body, nil
}
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
//
@@ -264,10 +285,16 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
// tSec == 0 means no timeout — use the raw context (no deadline)
}
// Read the incoming request body (capped at 1MB)
body, err := io.ReadAll(io.LimitReader(c.Request.Body, maxProxyRequestBody))
// Read the incoming request body (capped at maxProxyRequestBody). If the
// caller sends a larger body, fail LOUD with 413 instead of silently
// truncating mid-message (core#2677).
body, err := readBodyWithLimit(c.Request.Body, maxProxyRequestBody, "request")
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"})
c.JSON(http.StatusRequestEntityTooLarge, gin.H{
"error": err.Error(),
"truncated": true,
"max_bytes": maxProxyRequestBody,
})
return
}
@@ -565,15 +592,19 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
defer func() { _ = resp.Body.Close() }()
// Read agent response (capped at 10MB).
// Read agent response (capped at maxProxyResponseBody).
// #689: Do() succeeded, which means the target received the request and sent
// back response headers — delivery is confirmed. The body couldn't be
// fully read (connection drop, timeout mid-stream). Surface
// delivery_confirmed so callers can distinguish "not delivered" from
// "delivered, but response body lost". When delivery is confirmed,
// log the activity as successful (delivery happened) rather than leaving
// a false "failed" entry in the audit trail.
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
// fully read (connection drop, timeout mid-stream, OR it exceeded the
// maxProxyResponseBody limit). Surface delivery_confirmed so callers can
// distinguish "not delivered" from "delivered, but response body lost".
// When delivery is confirmed, log the activity as successful (delivery
// happened) rather than leaving a false "failed" entry in the audit trail.
//
// core#2677: readBodyWithLimit detects oversize responses and returns an
// errA2ABodyTooLarge-wrapped error so we surface a loud "truncated" flag
// instead of silently cutting long agent replies.
respBody, readErr := readBodyWithLimit(resp.Body, maxProxyResponseBody, "response")
if readErr != nil {
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
@@ -598,11 +629,17 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
if resp.StatusCode >= 300 {
errStatus = resp.StatusCode
}
errMsg := "failed to read agent response"
if errors.Is(readErr, errA2ABodyTooLarge) {
errMsg = readErr.Error()
}
return resp.StatusCode, respBody, &proxyA2AError{
Status: errStatus,
Response: gin.H{
"error": "failed to read agent response",
"error": errMsg,
"delivery_confirmed": deliveryConfirmed,
"truncated": errors.Is(readErr, errA2ABodyTooLarge),
"max_bytes": maxProxyResponseBody,
},
}
}
@@ -0,0 +1,170 @@
package handlers
// a2a_proxy_truncation_test.go — regression coverage for core#2677.
//
// The A2A proxy caps request/response bodies to bounded sizes. The bug was
// that oversize bodies were silently truncated by io.LimitReader; these tests
// lock in the fix: bodies within the limit pass through intact, bodies over
// the limit fail LOUD with a clear truncated flag and no silent cutting.
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// TestReadBodyWithLimit_UnderLimit proves a body smaller than the limit is
// returned unchanged and without error.
func TestReadBodyWithLimit_UnderLimit(t *testing.T) {
body := []byte("small payload")
got, err := readBodyWithLimit(bytes.NewReader(body), 1024, "request")
if err != nil {
t.Fatalf("readBodyWithLimit returned unexpected error: %v", err)
}
if !bytes.Equal(got, body) {
t.Fatalf("body changed: got %q, want %q", got, body)
}
}
// TestReadBodyWithLimit_AtLimit proves a body exactly at the limit is accepted
// (the limit is an inclusive maximum).
func TestReadBodyWithLimit_AtLimit(t *testing.T) {
body := []byte("exactly-five")
got, err := readBodyWithLimit(bytes.NewReader(body), len(body), "request")
if err != nil {
t.Fatalf("readBodyWithLimit returned unexpected error: %v", err)
}
if !bytes.Equal(got, body) {
t.Fatalf("body changed: got %q, want %q", got, body)
}
}
// TestReadBodyWithLimit_OverLimit proves an oversize body returns the first
// limit bytes AND an errA2ABodyTooLarge-wrapped error so callers can fail loud.
func TestReadBodyWithLimit_OverLimit(t *testing.T) {
body := []byte("hello world")
limit := 5
got, err := readBodyWithLimit(bytes.NewReader(body), limit, "request")
if err == nil {
t.Fatal("expected error for oversize body, got nil")
}
if !errors.Is(err, errA2ABodyTooLarge) {
t.Fatalf("expected errA2ABodyTooLarge, got %v", err)
}
want := body[:limit]
if !bytes.Equal(got, want) {
t.Fatalf("truncated body mismatch: got %q, want %q", got, want)
}
}
// TestProxyA2A_RequestBodyTooLarge proves the public proxy endpoint returns
// 413 Payload Too Large with a truncated flag instead of silently cutting a
// >maxProxyRequestBody payload.
func TestProxyA2A_RequestBodyTooLarge(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-oversize-req"}}
// maxProxyRequestBody+1 bytes guarantees truncation detection.
oversize := strings.Repeat("A", maxProxyRequestBody+1)
c.Request = httptest.NewRequest("POST", "/workspaces/ws-oversize-req/a2a", strings.NewReader(oversize))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
if w.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("expected status 413, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["truncated"] != true {
t.Errorf("expected truncated=true, got %v", resp["truncated"])
}
if _, ok := resp["max_bytes"]; !ok {
t.Errorf("expected max_bytes in response, got %v", resp)
}
if !strings.Contains(fmt.Sprint(resp["error"]), "exceeds") {
t.Errorf("expected error to mention limit exceeded, got %v", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestProxyA2A_LargeRequestWithinLimit proves a body at the (raised) request
// limit is accepted and forwarded intact, closing the original 1MB silent-cut
// gap for spec-length delegations.
func TestProxyA2A_LargeRequestWithinLimit(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
var receivedLen int
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
receivedBody, _ := io.ReadAll(r.Body)
receivedLen = len(receivedBody)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-large-ok"), agentServer.URL)
expectBudgetCheck(mock, "ws-large-ok")
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-large-ok"}}
// Build a valid JSON-RPC body just under the new 16MB cap.
// Include messageId and use the canonical v0.3 "kind" discriminator so
// normalizeA2APayload does not add fields during forwarding (which would
// change the body length and break the exact-length assertion).
prefix := `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-1","parts":[{"kind":"text","text":"`
suffix := `"}]}}}`
paddingLen := maxProxyRequestBody - len(prefix) - len(suffix)
if paddingLen < 0 {
t.Fatalf("test setup error: prefix+suffix already exceeds maxProxyRequestBody")
}
largeBody := prefix + strings.Repeat("X", paddingLen) + suffix
c.Request = httptest.NewRequest("POST", "/workspaces/ws-large-ok/a2a", strings.NewReader(largeBody))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
if receivedLen != len(largeBody) {
t.Errorf("forwarded body length mismatch: got %d, want %d", receivedLen, len(largeBody))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}