From 5d357ab6adc5a21495e261f9a0ed43b0c3528d3b Mon Sep 17 00:00:00 2001 From: core-devops Date: Sat, 13 Jun 2026 14:37:30 -0700 Subject: [PATCH 1/2] =?UTF-8?q?feat(a2a):=20canvas=20cap-and-queue=20behin?= =?UTF-8?q?d=20A2A=5FCANVAS=5FSYNC=5FBUDGET=20(default=20off)=20=E2=80=94?= =?UTF-8?q?=20core#2751?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The canvas→agent POST is held for the whole turn; a turn longer than Cloudflare's ~100s edge limit returns a 524 (the recurring "Failed to send"). Server-side timeout raises (#2727/#2749) can't help — CF caps first. Durable fix, OPT-IN: when A2A_CANVAS_SYNC_BUDGET > 0, the ProxyA2A handler caps the synchronous wait for canvas callers; if the turn outlives the budget it acks {status:"queued"} and the dispatch finishes on its own. proxyA2ARequest's dispatch already runs on a context.WithoutCancel forward ctx (idle-bounded), so it survives the handler returning, and the agent's reply reaches the canvas via the AGENT_MESSAGE WS broadcast — the same poll-mode contract the client already handles. The work runs on a detached ctx so its DB logging isn't cancelled. Default 0 = unchanged synchronous path (proxyA2ARequest is byte-identical); no behavior change until an operator sets the budget (e.g. 90s, under CF's 100s). Implemented at the handler seam to keep the core dispatch untouched. Test: a 600ms agent + 100ms budget returns queued without holding the connection; all existing ProxyA2A tests (flag-off path) green. Co-Authored-By: Claude Fable 5 --- .../internal/handlers/a2a_proxy.go | 44 ++++++++++++++ .../internal/handlers/a2a_proxy_test.go | 57 +++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index d40e68deb..c7a821f83 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -380,6 +380,50 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { } } + // CANVAS CAP-AND-QUEUE (core#2751, DEFAULT OFF). The canvas→agent POST is + // held for the whole turn; a turn longer than Cloudflare's ~100s edge limit + // returns a 524 (the recurring "Failed to send"). When A2A_CANVAS_SYNC_BUDGET + // > 0, cap the SYNCHRONOUS wait for canvas callers below that limit: if the + // turn hasn't finished by the budget, ack `{status:"queued"}` and let the + // dispatch finish on its own — proxyA2ARequest's dispatch already runs on a + // context.WithoutCancel forward ctx (idle-bounded), so it survives this + // handler returning, and the agent's reply reaches the canvas via the + // AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work + // runs on a detached ctx so its DB logging isn't cancelled when we return. + // Budget=0 (default) → the unchanged synchronous path below; no behavior + // change until an operator opts in. See the design on core#2751. + if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && callerID == "" { + type a2aResult struct { + status int + body []byte + perr *proxyA2AError + } + detached := context.WithoutCancel(ctx) + done := make(chan a2aResult, 1) + go func() { + s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser) + done <- a2aResult{s, b, pe} + }() + select { + case r := <-done: + if r.perr != nil { + for k, v := range r.perr.Headers { + c.Header(k, v) + } + c.JSON(r.perr.Status, r.perr.Response) + return + } + c.Data(r.status, "application/json", r.body) + return + case <-time.After(budget): + // Outlived CF's edge limit — ack queued; the goroutine finishes and + // the reply lands via WS. The canvas already treats `queued` as + // "still processing" (delivery_mode mirrors poll-mode). + c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"}) + return + } + } + status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true, isCanvasUser) if proxyErr != nil { for k, v := range proxyErr.Headers { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index b618b52ec..e3a1d4a10 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2962,3 +2962,60 @@ func TestInjectCanvasUserIdentity_Nil(t *testing.T) { } } + +// ==================== ProxyA2A — canvas cap-and-queue (core#2751) ==================== + +// When A2A_CANVAS_SYNC_BUDGET > 0, a canvas turn that outlives the budget is +// ack'd `{status:"queued"}` instead of holding the connection (which CF would +// 524). The dispatch continues on its detached forward ctx; the reply reaches +// the canvas via the AGENT_MESSAGE WS broadcast. Flag default 0 = unchanged +// synchronous path (covered by the other ProxyA2A tests). +func TestProxyA2A_CanvasCapAndQueue(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + // Agent that holds the connection PAST the budget (bounded sleep — no + // deadlock with agentServer.Close()). 600ms >> the 100ms budget, so the + // handler must cap-and-queue before the agent ever responds. + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(600 * time.Millisecond) + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-capq"), agentServer.URL) + expectBudgetCheck(mock, "ws-capq") + // persistUserMessageAtIngest fires (in the detached goroutine) before the + // dispatch blocks — allow the INSERT. The .Maybe()-style tolerance: the + // async ordering means we don't assert ExpectationsWereMet strictly here. + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "100ms") + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-capq"}} + // Canvas caller: NO X-Workspace-ID header → callerID == "". + body := `{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"long task"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-capq/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + start := time.Now() + handler.ProxyA2A(c) + elapsed := time.Since(start) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), `"queued"`) { + t.Errorf("expected queued ack, got: %s", w.Body.String()) + } + // Returned at ~budget, NOT after the (blocked) agent — proves the cap fired. + if elapsed > 2*time.Second { + t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed) + } +} -- 2.52.0 From 8d9eed648246729f7d59f58bfabc6ecd9278ee12 Mon Sep 17 00:00:00 2001 From: core-devops Date: Sat, 13 Jun 2026 14:49:58 -0700 Subject: [PATCH 2/2] fix(a2a): cover the authenticated canvas-user path in cap-and-queue + broadcast (CR2 #2777) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CR2: the cap predicate `callerID == ""` and the A2A_RESPONSE broadcast gate both missed the modern authenticated-canvas path (X-Workspace-ID → isCanvasUser=true, non-empty callerID), so the durable 524 fix wouldn't apply there and the async reply wouldn't be delivered. - Cap predicate → `budget > 0 && (callerID == "" || isCanvasUser)`. - logA2ASuccess now takes isCanvasUser + broadcasts A2A_RESPONSE for `(callerID == "" || isCanvasUser)`. Safe on the sync path: the canvas already gets BOTH the inline reply and this WS event, and appendMessageDeduped collapses them by (role, content, 3s) — exactly why the anonymous canvas path doesn't double-render today. Verified the client renders A2A_RESPONSE caller-agnostically (canvas-events.ts handleCanvasEvent keys on workspace_id), so no live e2e is required — it rides the proven, deduped delivery path. - Threaded isCanvasUser through handleMockA2A too. Tests: logA2ASuccess broadcasts A2A_RESPONSE for an authenticated canvas user + does NOT for a workspace-to-workspace caller; existing cap-and-queue + logA2A tests green. Default flag off = unchanged synchronous path. Co-Authored-By: Claude Fable 5 --- .../internal/handlers/a2a_proxy.go | 8 +-- .../internal/handlers/a2a_proxy_helpers.go | 13 +++- .../internal/handlers/a2a_proxy_test.go | 60 ++++++++++++++++++- .../internal/handlers/mock_runtime.go | 4 +- 4 files changed, 75 insertions(+), 10 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index c7a821f83..b78990c63 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -392,7 +392,7 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { // runs on a detached ctx so its DB logging isn't cancelled when we return. // Budget=0 (default) → the unchanged synchronous path below; no behavior // change until an operator opts in. See the design on core#2751. - if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && callerID == "" { + if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && (callerID == "" || isCanvasUser) { type a2aResult struct { status int body []byte @@ -609,7 +609,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri // intent if anyone ever does that), BEFORE resolveAgentURL (mock // has no URL — going through resolveAgentURL would 404 on the // SELECT url since the row is provisioned as NULL). - if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, body, a2aMethod, logActivity); handled { + if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, isCanvasUser, body, a2aMethod, logActivity); handled { return status, respBody, nil } @@ -675,7 +675,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) if logActivity && deliveryConfirmed { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs) } // Preserve the actual HTTP status code and any body bytes already read. // Previously this returned (0, nil, error) which discarded both. @@ -725,7 +725,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } if logActivity { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs) } // Track LLM token usage for cost transparency (#593). diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index e80114ad8..cba45fe70 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -358,7 +358,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated // 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can // receive the reply without polling. -func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) { +func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body, respBody []byte, a2aMethod string, statusCode, durationMs int) { logStatus := "ok" if statusCode >= 400 { logStatus = "error" @@ -429,7 +429,16 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle MessageId: extractMessageIdFromA2ABody(body), }) - if callerID == "" && statusCode < 400 { + // Broadcast A2A_RESPONSE for the CANVAS (so the reply reaches the frontend + // over WS, not just inline) — both the anonymous canvas (callerID == "") + // AND the authenticated canvas user (isCanvasUser, non-empty callerID via + // X-Workspace-ID + validateCallerToken). core#2751: the cap-and-queue path + // returns {queued} for canvas users and depends on THIS broadcast to + // deliver the reply. Safe on the synchronous path too — the canvas already + // receives both the inline HTTP reply and this WS event, and + // appendMessageDeduped collapses them by (role, content, 3s window), which + // is exactly why the anonymous canvas path doesn't double-render today. + if (callerID == "" || isCanvasUser) && statusCode < 400 { h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ "response_body": json.RawMessage(respBody), "method": a2aMethod, diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index e3a1d4a10..b4e1d851c 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2336,7 +2336,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10) + handler.logA2ASuccess(context.Background(), "ws-ok", "", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10) time.Sleep(80 * time.Millisecond) } @@ -2354,7 +2354,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 1)) // callerID != "" also means no A2A_RESPONSE broadcast. - handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10) + handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", false, []byte(`{}`), []byte(`{}`), "message/send", 500, 10) time.Sleep(80 * time.Millisecond) } @@ -3019,3 +3019,59 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) { t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed) } } + + +// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE +// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true, +// non-empty callerID via X-Workspace-ID) — not just the anonymous callerID=="" +// canvas — so the cap-and-queue async reply reaches the frontend. A real +// workspace caller (isCanvasUser=false) still gets NO broadcast. +func TestLogA2ASuccess_BroadcastsForCanvasUser(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + rec := &recordingBroadcaster{} + handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-cu"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Canvas Target")) + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + // Authenticated canvas user: callerID non-empty, isCanvasUser=true. + handler.logA2ASuccess(context.Background(), "ws-cu", "ws-canvas-user", true, []byte(`{}`), []byte(`{"result":"hi"}`), "message/send", 200, 12) + time.Sleep(80 * time.Millisecond) + + got := false + for _, c := range rec.calls { + if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-cu" { + got = true + } + } + if !got { + t.Fatalf("expected A2A_RESPONSE broadcast for authenticated canvas user; recorded: %+v", rec.calls) + } +} + +// A real workspace-to-workspace caller (isCanvasUser=false) gets NO A2A_RESPONSE. +func TestLogA2ASuccess_NoBroadcastForWorkspaceCaller(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + rec := &recordingBroadcaster{} + handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-peer"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Peer")) + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ASuccess(context.Background(), "ws-peer", "ws-other", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 12) + time.Sleep(80 * time.Millisecond) + + for _, c := range rec.calls { + if c.eventType == "A2A_RESPONSE" { + t.Fatalf("unexpected A2A_RESPONSE broadcast for a workspace-to-workspace caller") + } + } +} diff --git a/workspace-server/internal/handlers/mock_runtime.go b/workspace-server/internal/handlers/mock_runtime.go index 2fe1000eb..07e8785e6 100644 --- a/workspace-server/internal/handlers/mock_runtime.go +++ b/workspace-server/internal/handlers/mock_runtime.go @@ -181,7 +181,7 @@ func extractRequestID(body []byte) string { // mock reply in the trace alongside real-agent traffic. Without this // the demo would render messages on the canvas chat panel but a peer // node clicking through to its activity tab would see an empty list. -func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) { +func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) { if lookupRuntime(ctx, workspaceID) != MockRuntimeName { return 0, nil, false } @@ -204,7 +204,7 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle // is identical to a real agent reply. Status 200 + duration 0 // is the "synthesised reply" marker; activity_logs.duration_ms // being 0 is harmless (real fast paths can hit 0 too). - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0) + h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, http.StatusOK, 0) } return http.StatusOK, respBody, true } -- 2.52.0