diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index d40e68deb..b78990c63 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -380,6 +380,50 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { } } + // CANVAS CAP-AND-QUEUE (core#2751, DEFAULT OFF). The canvas→agent POST is + // held for the whole turn; a turn longer than Cloudflare's ~100s edge limit + // returns a 524 (the recurring "Failed to send"). When A2A_CANVAS_SYNC_BUDGET + // > 0, cap the SYNCHRONOUS wait for canvas callers below that limit: if the + // turn hasn't finished by the budget, ack `{status:"queued"}` and let the + // dispatch finish on its own — proxyA2ARequest's dispatch already runs on a + // context.WithoutCancel forward ctx (idle-bounded), so it survives this + // handler returning, and the agent's reply reaches the canvas via the + // AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work + // runs on a detached ctx so its DB logging isn't cancelled when we return. + // Budget=0 (default) → the unchanged synchronous path below; no behavior + // change until an operator opts in. See the design on core#2751. + if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && (callerID == "" || isCanvasUser) { + type a2aResult struct { + status int + body []byte + perr *proxyA2AError + } + detached := context.WithoutCancel(ctx) + done := make(chan a2aResult, 1) + go func() { + s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser) + done <- a2aResult{s, b, pe} + }() + select { + case r := <-done: + if r.perr != nil { + for k, v := range r.perr.Headers { + c.Header(k, v) + } + c.JSON(r.perr.Status, r.perr.Response) + return + } + c.Data(r.status, "application/json", r.body) + return + case <-time.After(budget): + // Outlived CF's edge limit — ack queued; the goroutine finishes and + // the reply lands via WS. The canvas already treats `queued` as + // "still processing" (delivery_mode mirrors poll-mode). + c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"}) + return + } + } + status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true, isCanvasUser) if proxyErr != nil { for k, v := range proxyErr.Headers { @@ -565,7 +609,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri // intent if anyone ever does that), BEFORE resolveAgentURL (mock // has no URL — going through resolveAgentURL would 404 on the // SELECT url since the row is provisioned as NULL). - if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, body, a2aMethod, logActivity); handled { + if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, isCanvasUser, body, a2aMethod, logActivity); handled { return status, respBody, nil } @@ -631,7 +675,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) if logActivity && deliveryConfirmed { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs) } // Preserve the actual HTTP status code and any body bytes already read. // Previously this returned (0, nil, error) which discarded both. @@ -681,7 +725,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } if logActivity { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs) } // Track LLM token usage for cost transparency (#593). diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index e80114ad8..cba45fe70 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -358,7 +358,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated // 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can // receive the reply without polling. -func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) { +func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body, respBody []byte, a2aMethod string, statusCode, durationMs int) { logStatus := "ok" if statusCode >= 400 { logStatus = "error" @@ -429,7 +429,16 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle MessageId: extractMessageIdFromA2ABody(body), }) - if callerID == "" && statusCode < 400 { + // Broadcast A2A_RESPONSE for the CANVAS (so the reply reaches the frontend + // over WS, not just inline) — both the anonymous canvas (callerID == "") + // AND the authenticated canvas user (isCanvasUser, non-empty callerID via + // X-Workspace-ID + validateCallerToken). core#2751: the cap-and-queue path + // returns {queued} for canvas users and depends on THIS broadcast to + // deliver the reply. Safe on the synchronous path too — the canvas already + // receives both the inline HTTP reply and this WS event, and + // appendMessageDeduped collapses them by (role, content, 3s window), which + // is exactly why the anonymous canvas path doesn't double-render today. + if (callerID == "" || isCanvasUser) && statusCode < 400 { h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ "response_body": json.RawMessage(respBody), "method": a2aMethod, diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index b618b52ec..b4e1d851c 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2336,7 +2336,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10) + handler.logA2ASuccess(context.Background(), "ws-ok", "", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10) time.Sleep(80 * time.Millisecond) } @@ -2354,7 +2354,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 1)) // callerID != "" also means no A2A_RESPONSE broadcast. - handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10) + handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", false, []byte(`{}`), []byte(`{}`), "message/send", 500, 10) time.Sleep(80 * time.Millisecond) } @@ -2962,3 +2962,116 @@ func TestInjectCanvasUserIdentity_Nil(t *testing.T) { } } + +// ==================== ProxyA2A — canvas cap-and-queue (core#2751) ==================== + +// When A2A_CANVAS_SYNC_BUDGET > 0, a canvas turn that outlives the budget is +// ack'd `{status:"queued"}` instead of holding the connection (which CF would +// 524). The dispatch continues on its detached forward ctx; the reply reaches +// the canvas via the AGENT_MESSAGE WS broadcast. Flag default 0 = unchanged +// synchronous path (covered by the other ProxyA2A tests). +func TestProxyA2A_CanvasCapAndQueue(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + // Agent that holds the connection PAST the budget (bounded sleep — no + // deadlock with agentServer.Close()). 600ms >> the 100ms budget, so the + // handler must cap-and-queue before the agent ever responds. + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(600 * time.Millisecond) + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-capq"), agentServer.URL) + expectBudgetCheck(mock, "ws-capq") + // persistUserMessageAtIngest fires (in the detached goroutine) before the + // dispatch blocks — allow the INSERT. The .Maybe()-style tolerance: the + // async ordering means we don't assert ExpectationsWereMet strictly here. + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "100ms") + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-capq"}} + // Canvas caller: NO X-Workspace-ID header → callerID == "". + body := `{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"long task"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-capq/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + start := time.Now() + handler.ProxyA2A(c) + elapsed := time.Since(start) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), `"queued"`) { + t.Errorf("expected queued ack, got: %s", w.Body.String()) + } + // Returned at ~budget, NOT after the (blocked) agent — proves the cap fired. + if elapsed > 2*time.Second { + t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed) + } +} + + +// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE +// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true, +// non-empty callerID via X-Workspace-ID) — not just the anonymous callerID=="" +// canvas — so the cap-and-queue async reply reaches the frontend. A real +// workspace caller (isCanvasUser=false) still gets NO broadcast. +func TestLogA2ASuccess_BroadcastsForCanvasUser(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + rec := &recordingBroadcaster{} + handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-cu"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Canvas Target")) + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + // Authenticated canvas user: callerID non-empty, isCanvasUser=true. + handler.logA2ASuccess(context.Background(), "ws-cu", "ws-canvas-user", true, []byte(`{}`), []byte(`{"result":"hi"}`), "message/send", 200, 12) + time.Sleep(80 * time.Millisecond) + + got := false + for _, c := range rec.calls { + if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-cu" { + got = true + } + } + if !got { + t.Fatalf("expected A2A_RESPONSE broadcast for authenticated canvas user; recorded: %+v", rec.calls) + } +} + +// A real workspace-to-workspace caller (isCanvasUser=false) gets NO A2A_RESPONSE. +func TestLogA2ASuccess_NoBroadcastForWorkspaceCaller(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + rec := &recordingBroadcaster{} + handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-peer"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Peer")) + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ASuccess(context.Background(), "ws-peer", "ws-other", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 12) + time.Sleep(80 * time.Millisecond) + + for _, c := range rec.calls { + if c.eventType == "A2A_RESPONSE" { + t.Fatalf("unexpected A2A_RESPONSE broadcast for a workspace-to-workspace caller") + } + } +} diff --git a/workspace-server/internal/handlers/mock_runtime.go b/workspace-server/internal/handlers/mock_runtime.go index 2fe1000eb..07e8785e6 100644 --- a/workspace-server/internal/handlers/mock_runtime.go +++ b/workspace-server/internal/handlers/mock_runtime.go @@ -181,7 +181,7 @@ func extractRequestID(body []byte) string { // mock reply in the trace alongside real-agent traffic. Without this // the demo would render messages on the canvas chat panel but a peer // node clicking through to its activity tab would see an empty list. -func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) { +func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) { if lookupRuntime(ctx, workspaceID) != MockRuntimeName { return 0, nil, false } @@ -204,7 +204,7 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle // is identical to a real agent reply. Status 200 + duration 0 // is the "synthesised reply" marker; activity_logs.duration_ms // being 0 is harmless (real fast paths can hit 0 too). - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0) + h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, http.StatusOK, 0) } return http.StatusOK, respBody, true }