fix(a2a): detect and fail loud on proxy body truncation (core#2677) #2681
@@ -11,6 +11,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
@@ -73,8 +74,8 @@ func setPlatformInDockerForTest(v bool) func() {
|
||||
return func() { platformInDocker = prev }
|
||||
}
|
||||
|
||||
// maxProxyRequestBody is the maximum size of an A2A proxy request body (1MB).
|
||||
const maxProxyRequestBody = 1 << 20
|
||||
// maxProxyRequestBody is the maximum size of an A2A proxy request body (16MB).
|
||||
const maxProxyRequestBody = 16 << 20
|
||||
|
||||
// systemCallerPrefixes are caller IDs that bypass workspace access control.
|
||||
// These are non-workspace internal callers (webhooks, system services, tests).
|
||||
@@ -90,8 +91,28 @@ func isSystemCaller(callerID string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// maxProxyResponseBody is the maximum size of an A2A proxy response body (10MB).
|
||||
const maxProxyResponseBody = 10 << 20
|
||||
// maxProxyResponseBody is the maximum size of an A2A proxy response body (64MB).
|
||||
const maxProxyResponseBody = 64 << 20
|
||||
|
||||
// errA2ABodyTooLarge is returned by readBodyWithLimit when a body exceeds the
|
||||
// configured limit. Callers surface it as a loud 413 / truncated proxy error
|
||||
// instead of silently cutting the payload.
|
||||
var errA2ABodyTooLarge = errors.New("A2A body exceeds size limit")
|
||||
|
||||
// readBodyWithLimit reads up to limit bytes from r. It returns an error
|
||||
// (wrapping errA2ABodyTooLarge) when the input is larger than limit so the
|
||||
// caller can fail loud instead of silently truncating. The returned body is
|
||||
// capped at limit bytes; on truncation it contains the first limit bytes read.
|
||||
func readBodyWithLimit(r io.Reader, limit int, kind string) ([]byte, error) {
|
||||
body, err := io.ReadAll(io.LimitReader(r, int64(limit)+1))
|
||||
if err != nil {
|
||||
return body, err
|
||||
}
|
||||
if len(body) > limit {
|
||||
return body[:limit], fmt.Errorf("%s body exceeds %d byte limit: %w", kind, limit, errA2ABodyTooLarge)
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
|
||||
//
|
||||
@@ -264,10 +285,16 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
// tSec == 0 means no timeout — use the raw context (no deadline)
|
||||
}
|
||||
|
||||
// Read the incoming request body (capped at 1MB)
|
||||
body, err := io.ReadAll(io.LimitReader(c.Request.Body, maxProxyRequestBody))
|
||||
// Read the incoming request body (capped at maxProxyRequestBody). If the
|
||||
// caller sends a larger body, fail LOUD with 413 instead of silently
|
||||
// truncating mid-message (core#2677).
|
||||
body, err := readBodyWithLimit(c.Request.Body, maxProxyRequestBody, "request")
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"})
|
||||
c.JSON(http.StatusRequestEntityTooLarge, gin.H{
|
||||
"error": err.Error(),
|
||||
"truncated": true,
|
||||
"max_bytes": maxProxyRequestBody,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -565,15 +592,19 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// Read agent response (capped at 10MB).
|
||||
// Read agent response (capped at maxProxyResponseBody).
|
||||
// #689: Do() succeeded, which means the target received the request and sent
|
||||
// back response headers — delivery is confirmed. The body couldn't be
|
||||
// fully read (connection drop, timeout mid-stream). Surface
|
||||
// delivery_confirmed so callers can distinguish "not delivered" from
|
||||
// "delivered, but response body lost". When delivery is confirmed,
|
||||
// log the activity as successful (delivery happened) rather than leaving
|
||||
// a false "failed" entry in the audit trail.
|
||||
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
// fully read (connection drop, timeout mid-stream, OR it exceeded the
|
||||
// maxProxyResponseBody limit). Surface delivery_confirmed so callers can
|
||||
// distinguish "not delivered" from "delivered, but response body lost".
|
||||
// When delivery is confirmed, log the activity as successful (delivery
|
||||
// happened) rather than leaving a false "failed" entry in the audit trail.
|
||||
//
|
||||
// core#2677: readBodyWithLimit detects oversize responses and returns an
|
||||
// errA2ABodyTooLarge-wrapped error so we surface a loud "truncated" flag
|
||||
// instead of silently cutting long agent replies.
|
||||
respBody, readErr := readBodyWithLimit(resp.Body, maxProxyResponseBody, "response")
|
||||
if readErr != nil {
|
||||
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
@@ -598,11 +629,17 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
if resp.StatusCode >= 300 {
|
||||
errStatus = resp.StatusCode
|
||||
}
|
||||
errMsg := "failed to read agent response"
|
||||
if errors.Is(readErr, errA2ABodyTooLarge) {
|
||||
errMsg = readErr.Error()
|
||||
}
|
||||
return resp.StatusCode, respBody, &proxyA2AError{
|
||||
Status: errStatus,
|
||||
Response: gin.H{
|
||||
"error": "failed to read agent response",
|
||||
"error": errMsg,
|
||||
"delivery_confirmed": deliveryConfirmed,
|
||||
"truncated": errors.Is(readErr, errA2ABodyTooLarge),
|
||||
"max_bytes": maxProxyResponseBody,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
package handlers
|
||||
|
||||
// a2a_proxy_truncation_test.go — regression coverage for core#2677.
|
||||
//
|
||||
// The A2A proxy caps request/response bodies to bounded sizes. The bug was
|
||||
// that oversize bodies were silently truncated by io.LimitReader; these tests
|
||||
// lock in the fix: bodies within the limit pass through intact, bodies over
|
||||
// the limit fail LOUD with a clear truncated flag and no silent cutting.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestReadBodyWithLimit_UnderLimit proves a body smaller than the limit is
|
||||
// returned unchanged and without error.
|
||||
func TestReadBodyWithLimit_UnderLimit(t *testing.T) {
|
||||
body := []byte("small payload")
|
||||
got, err := readBodyWithLimit(bytes.NewReader(body), 1024, "request")
|
||||
if err != nil {
|
||||
t.Fatalf("readBodyWithLimit returned unexpected error: %v", err)
|
||||
}
|
||||
if !bytes.Equal(got, body) {
|
||||
t.Fatalf("body changed: got %q, want %q", got, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadBodyWithLimit_AtLimit proves a body exactly at the limit is accepted
|
||||
// (the limit is an inclusive maximum).
|
||||
func TestReadBodyWithLimit_AtLimit(t *testing.T) {
|
||||
body := []byte("exactly-five")
|
||||
got, err := readBodyWithLimit(bytes.NewReader(body), len(body), "request")
|
||||
if err != nil {
|
||||
t.Fatalf("readBodyWithLimit returned unexpected error: %v", err)
|
||||
}
|
||||
if !bytes.Equal(got, body) {
|
||||
t.Fatalf("body changed: got %q, want %q", got, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadBodyWithLimit_OverLimit proves an oversize body returns the first
|
||||
// limit bytes AND an errA2ABodyTooLarge-wrapped error so callers can fail loud.
|
||||
func TestReadBodyWithLimit_OverLimit(t *testing.T) {
|
||||
body := []byte("hello world")
|
||||
limit := 5
|
||||
got, err := readBodyWithLimit(bytes.NewReader(body), limit, "request")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for oversize body, got nil")
|
||||
}
|
||||
if !errors.Is(err, errA2ABodyTooLarge) {
|
||||
t.Fatalf("expected errA2ABodyTooLarge, got %v", err)
|
||||
}
|
||||
want := body[:limit]
|
||||
if !bytes.Equal(got, want) {
|
||||
t.Fatalf("truncated body mismatch: got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_RequestBodyTooLarge proves the public proxy endpoint returns
|
||||
// 413 Payload Too Large with a truncated flag instead of silently cutting a
|
||||
// >maxProxyRequestBody payload.
|
||||
func TestProxyA2A_RequestBodyTooLarge(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-oversize-req"}}
|
||||
|
||||
// maxProxyRequestBody+1 bytes guarantees truncation detection.
|
||||
oversize := strings.Repeat("A", maxProxyRequestBody+1)
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-oversize-req/a2a", strings.NewReader(oversize))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
if w.Code != http.StatusRequestEntityTooLarge {
|
||||
t.Fatalf("expected status 413, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
if resp["truncated"] != true {
|
||||
t.Errorf("expected truncated=true, got %v", resp["truncated"])
|
||||
}
|
||||
if _, ok := resp["max_bytes"]; !ok {
|
||||
t.Errorf("expected max_bytes in response, got %v", resp)
|
||||
}
|
||||
if !strings.Contains(fmt.Sprint(resp["error"]), "exceeds") {
|
||||
t.Errorf("expected error to mention limit exceeded, got %v", resp["error"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_LargeRequestWithinLimit proves a body at the (raised) request
|
||||
// limit is accepted and forwarded intact, closing the original 1MB silent-cut
|
||||
// gap for spec-length delegations.
|
||||
func TestProxyA2A_LargeRequestWithinLimit(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
var receivedLen int
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
receivedBody, _ := io.ReadAll(r.Body)
|
||||
receivedLen = len(receivedBody)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-large-ok"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-large-ok")
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-large-ok"}}
|
||||
|
||||
// Build a valid JSON-RPC body just under the new 16MB cap.
|
||||
// Include messageId and use the canonical v0.3 "kind" discriminator so
|
||||
// normalizeA2APayload does not add fields during forwarding (which would
|
||||
// change the body length and break the exact-length assertion).
|
||||
prefix := `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-1","parts":[{"kind":"text","text":"`
|
||||
suffix := `"}]}}}`
|
||||
paddingLen := maxProxyRequestBody - len(prefix) - len(suffix)
|
||||
if paddingLen < 0 {
|
||||
t.Fatalf("test setup error: prefix+suffix already exceeds maxProxyRequestBody")
|
||||
}
|
||||
largeBody := prefix + strings.Repeat("X", paddingLen) + suffix
|
||||
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-large-ok/a2a", strings.NewReader(largeBody))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if receivedLen != len(largeBody) {
|
||||
t.Errorf("forwarded body length mismatch: got %d, want %d", receivedLen, len(largeBody))
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user