feat(a2a): canvas cap-and-queue behind A2A_CANVAS_SYNC_BUDGET (default off) — core#2751 #2777

Merged
devops-engineer merged 2 commits from feat/canvas-async-dispatch-flag into main 2026-06-13 21:56:55 +00:00
4 changed files with 175 additions and 9 deletions
@@ -380,6 +380,50 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
}
}
// CANVAS CAP-AND-QUEUE (core#2751, DEFAULT OFF). The canvas→agent POST is
// held for the whole turn; a turn longer than Cloudflare's ~100s edge limit
// returns a 524 (the recurring "Failed to send"). When A2A_CANVAS_SYNC_BUDGET
// > 0, cap the SYNCHRONOUS wait for canvas callers below that limit: if the
// turn hasn't finished by the budget, ack `{status:"queued"}` and let the
// dispatch finish on its own — proxyA2ARequest's dispatch already runs on a
// context.WithoutCancel forward ctx (idle-bounded), so it survives this
// handler returning, and the agent's reply reaches the canvas via the
// AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work
// runs on a detached ctx so its DB logging isn't cancelled when we return.
// Budget=0 (default) → the unchanged synchronous path below; no behavior
// change until an operator opts in. See the design on core#2751.
if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && (callerID == "" || isCanvasUser) {
type a2aResult struct {
status int
body []byte
perr *proxyA2AError
}
detached := context.WithoutCancel(ctx)
done := make(chan a2aResult, 1)
go func() {
s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser)
done <- a2aResult{s, b, pe}
}()
select {
case r := <-done:
if r.perr != nil {
for k, v := range r.perr.Headers {
c.Header(k, v)
}
c.JSON(r.perr.Status, r.perr.Response)
return
}
c.Data(r.status, "application/json", r.body)
return
case <-time.After(budget):
// Outlived CF's edge limit — ack queued; the goroutine finishes and
// the reply lands via WS. The canvas already treats `queued` as
// "still processing" (delivery_mode mirrors poll-mode).
c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"})
return
}
}
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true, isCanvasUser)
if proxyErr != nil {
for k, v := range proxyErr.Headers {
@@ -565,7 +609,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
// intent if anyone ever does that), BEFORE resolveAgentURL (mock
// has no URL — going through resolveAgentURL would 404 on the
// SELECT url since the row is provisioned as NULL).
if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, body, a2aMethod, logActivity); handled {
if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, isCanvasUser, body, a2aMethod, logActivity); handled {
return status, respBody, nil
}
@@ -631,7 +675,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
if logActivity && deliveryConfirmed {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs)
}
// Preserve the actual HTTP status code and any body bytes already read.
// Previously this returned (0, nil, error) which discarded both.
@@ -681,7 +725,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
if logActivity {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs)
}
// Track LLM token usage for cost transparency (#593).
@@ -358,7 +358,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
// receive the reply without polling.
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
logStatus := "ok"
if statusCode >= 400 {
logStatus = "error"
@@ -429,7 +429,16 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
MessageId: extractMessageIdFromA2ABody(body),
})
if callerID == "" && statusCode < 400 {
// Broadcast A2A_RESPONSE for the CANVAS (so the reply reaches the frontend
// over WS, not just inline) — both the anonymous canvas (callerID == "")
// AND the authenticated canvas user (isCanvasUser, non-empty callerID via
// X-Workspace-ID + validateCallerToken). core#2751: the cap-and-queue path
// returns {queued} for canvas users and depends on THIS broadcast to
// deliver the reply. Safe on the synchronous path too — the canvas already
// receives both the inline HTTP reply and this WS event, and
// appendMessageDeduped collapses them by (role, content, 3s window), which
// is exactly why the anonymous canvas path doesn't double-render today.
if (callerID == "" || isCanvasUser) && statusCode < 400 {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
"response_body": json.RawMessage(respBody),
"method": a2aMethod,
@@ -2336,7 +2336,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
handler.logA2ASuccess(context.Background(), "ws-ok", "", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
time.Sleep(80 * time.Millisecond)
}
@@ -2354,7 +2354,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// callerID != "" also means no A2A_RESPONSE broadcast.
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", false, []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
time.Sleep(80 * time.Millisecond)
}
@@ -2962,3 +2962,116 @@ func TestInjectCanvasUserIdentity_Nil(t *testing.T) {
}
}
// ==================== ProxyA2A — canvas cap-and-queue (core#2751) ====================
// When A2A_CANVAS_SYNC_BUDGET > 0, a canvas turn that outlives the budget is
// ack'd `{status:"queued"}` instead of holding the connection (which CF would
// 524). The dispatch continues on its detached forward ctx; the reply reaches
// the canvas via the AGENT_MESSAGE WS broadcast. Flag default 0 = unchanged
// synchronous path (covered by the other ProxyA2A tests).
func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
// Agent that holds the connection PAST the budget (bounded sleep — no
// deadlock with agentServer.Close()). 600ms >> the 100ms budget, so the
// handler must cap-and-queue before the agent ever responds.
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(600 * time.Millisecond)
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","result":{"status":"ok"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-capq"), agentServer.URL)
expectBudgetCheck(mock, "ws-capq")
// persistUserMessageAtIngest fires (in the detached goroutine) before the
// dispatch blocks — allow the INSERT. The .Maybe()-style tolerance: the
// async ordering means we don't assert ExpectationsWereMet strictly here.
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "100ms")
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-capq"}}
// Canvas caller: NO X-Workspace-ID header → callerID == "".
body := `{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"long task"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-capq/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
start := time.Now()
handler.ProxyA2A(c)
elapsed := time.Since(start)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), `"queued"`) {
t.Errorf("expected queued ack, got: %s", w.Body.String())
}
// Returned at ~budget, NOT after the (blocked) agent — proves the cap fired.
if elapsed > 2*time.Second {
t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed)
}
}
// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE
// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true,
// non-empty callerID via X-Workspace-ID) — not just the anonymous callerID==""
// canvas — so the cap-and-queue async reply reaches the frontend. A real
// workspace caller (isCanvasUser=false) still gets NO broadcast.
func TestLogA2ASuccess_BroadcastsForCanvasUser(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
rec := &recordingBroadcaster{}
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-cu").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Canvas Target"))
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
// Authenticated canvas user: callerID non-empty, isCanvasUser=true.
handler.logA2ASuccess(context.Background(), "ws-cu", "ws-canvas-user", true, []byte(`{}`), []byte(`{"result":"hi"}`), "message/send", 200, 12)
time.Sleep(80 * time.Millisecond)
got := false
for _, c := range rec.calls {
if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-cu" {
got = true
}
}
if !got {
t.Fatalf("expected A2A_RESPONSE broadcast for authenticated canvas user; recorded: %+v", rec.calls)
}
}
// A real workspace-to-workspace caller (isCanvasUser=false) gets NO A2A_RESPONSE.
func TestLogA2ASuccess_NoBroadcastForWorkspaceCaller(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
rec := &recordingBroadcaster{}
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-peer").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Peer"))
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
handler.logA2ASuccess(context.Background(), "ws-peer", "ws-other", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 12)
time.Sleep(80 * time.Millisecond)
for _, c := range rec.calls {
if c.eventType == "A2A_RESPONSE" {
t.Fatalf("unexpected A2A_RESPONSE broadcast for a workspace-to-workspace caller")
}
}
}
@@ -181,7 +181,7 @@ func extractRequestID(body []byte) string {
// mock reply in the trace alongside real-agent traffic. Without this
// the demo would render messages on the canvas chat panel but a peer
// node clicking through to its activity tab would see an empty list.
func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) {
func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) {
if lookupRuntime(ctx, workspaceID) != MockRuntimeName {
return 0, nil, false
}
@@ -204,7 +204,7 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle
// is identical to a real agent reply. Status 200 + duration 0
// is the "synthesised reply" marker; activity_logs.duration_ms
// being 0 is harmless (real fast paths can hit 0 too).
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0)
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, http.StatusOK, 0)
}
return http.StatusOK, respBody, true
}