feat(a2a): canvas cap-and-queue behind A2A_CANVAS_SYNC_BUDGET (default off) — core#2751 #2777
@@ -380,6 +380,50 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// CANVAS CAP-AND-QUEUE (core#2751, DEFAULT OFF). The canvas→agent POST is
|
||||
// held for the whole turn; a turn longer than Cloudflare's ~100s edge limit
|
||||
// returns a 524 (the recurring "Failed to send"). When A2A_CANVAS_SYNC_BUDGET
|
||||
// > 0, cap the SYNCHRONOUS wait for canvas callers below that limit: if the
|
||||
// turn hasn't finished by the budget, ack `{status:"queued"}` and let the
|
||||
// dispatch finish on its own — proxyA2ARequest's dispatch already runs on a
|
||||
// context.WithoutCancel forward ctx (idle-bounded), so it survives this
|
||||
// handler returning, and the agent's reply reaches the canvas via the
|
||||
// AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work
|
||||
// runs on a detached ctx so its DB logging isn't cancelled when we return.
|
||||
// Budget=0 (default) → the unchanged synchronous path below; no behavior
|
||||
// change until an operator opts in. See the design on core#2751.
|
||||
if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && (callerID == "" || isCanvasUser) {
|
||||
type a2aResult struct {
|
||||
status int
|
||||
body []byte
|
||||
perr *proxyA2AError
|
||||
}
|
||||
detached := context.WithoutCancel(ctx)
|
||||
done := make(chan a2aResult, 1)
|
||||
go func() {
|
||||
s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser)
|
||||
done <- a2aResult{s, b, pe}
|
||||
}()
|
||||
select {
|
||||
case r := <-done:
|
||||
if r.perr != nil {
|
||||
for k, v := range r.perr.Headers {
|
||||
c.Header(k, v)
|
||||
}
|
||||
c.JSON(r.perr.Status, r.perr.Response)
|
||||
return
|
||||
}
|
||||
c.Data(r.status, "application/json", r.body)
|
||||
return
|
||||
case <-time.After(budget):
|
||||
// Outlived CF's edge limit — ack queued; the goroutine finishes and
|
||||
// the reply lands via WS. The canvas already treats `queued` as
|
||||
// "still processing" (delivery_mode mirrors poll-mode).
|
||||
c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true, isCanvasUser)
|
||||
if proxyErr != nil {
|
||||
for k, v := range proxyErr.Headers {
|
||||
@@ -565,7 +609,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
// intent if anyone ever does that), BEFORE resolveAgentURL (mock
|
||||
// has no URL — going through resolveAgentURL would 404 on the
|
||||
// SELECT url since the row is provisioned as NULL).
|
||||
if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, body, a2aMethod, logActivity); handled {
|
||||
if status, respBody, handled := h.handleMockA2A(ctx, workspaceID, callerID, isCanvasUser, body, a2aMethod, logActivity); handled {
|
||||
return status, respBody, nil
|
||||
}
|
||||
|
||||
@@ -631,7 +675,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
||||
if logActivity && deliveryConfirmed {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
}
|
||||
// Preserve the actual HTTP status code and any body bytes already read.
|
||||
// Previously this returned (0, nil, error) which discarded both.
|
||||
@@ -681,7 +725,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
}
|
||||
|
||||
// Track LLM token usage for cost transparency (#593).
|
||||
|
||||
@@ -358,7 +358,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
logStatus := "ok"
|
||||
if statusCode >= 400 {
|
||||
logStatus = "error"
|
||||
@@ -429,7 +429,16 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
MessageId: extractMessageIdFromA2ABody(body),
|
||||
})
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
// Broadcast A2A_RESPONSE for the CANVAS (so the reply reaches the frontend
|
||||
// over WS, not just inline) — both the anonymous canvas (callerID == "")
|
||||
// AND the authenticated canvas user (isCanvasUser, non-empty callerID via
|
||||
// X-Workspace-ID + validateCallerToken). core#2751: the cap-and-queue path
|
||||
// returns {queued} for canvas users and depends on THIS broadcast to
|
||||
// deliver the reply. Safe on the synchronous path too — the canvas already
|
||||
// receives both the inline HTTP reply and this WS event, and
|
||||
// appendMessageDeduped collapses them by (role, content, 3s window), which
|
||||
// is exactly why the anonymous canvas path doesn't double-render today.
|
||||
if (callerID == "" || isCanvasUser) && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
|
||||
"response_body": json.RawMessage(respBody),
|
||||
"method": a2aMethod,
|
||||
|
||||
@@ -2336,7 +2336,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@@ -2354,7 +2354,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// callerID != "" also means no A2A_RESPONSE broadcast.
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", false, []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@@ -2962,3 +2962,116 @@ func TestInjectCanvasUserIdentity_Nil(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ==================== ProxyA2A — canvas cap-and-queue (core#2751) ====================
|
||||
|
||||
// When A2A_CANVAS_SYNC_BUDGET > 0, a canvas turn that outlives the budget is
|
||||
// ack'd `{status:"queued"}` instead of holding the connection (which CF would
|
||||
// 524). The dispatch continues on its detached forward ctx; the reply reaches
|
||||
// the canvas via the AGENT_MESSAGE WS broadcast. Flag default 0 = unchanged
|
||||
// synchronous path (covered by the other ProxyA2A tests).
|
||||
func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Agent that holds the connection PAST the budget (bounded sleep — no
|
||||
// deadlock with agentServer.Close()). 600ms >> the 100ms budget, so the
|
||||
// handler must cap-and-queue before the agent ever responds.
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"jsonrpc":"2.0","result":{"status":"ok"}}`)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-capq"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-capq")
|
||||
// persistUserMessageAtIngest fires (in the detached goroutine) before the
|
||||
// dispatch blocks — allow the INSERT. The .Maybe()-style tolerance: the
|
||||
// async ordering means we don't assert ExpectationsWereMet strictly here.
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "100ms")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-capq"}}
|
||||
// Canvas caller: NO X-Workspace-ID header → callerID == "".
|
||||
body := `{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"long task"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-capq/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
start := time.Now()
|
||||
handler.ProxyA2A(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), `"queued"`) {
|
||||
t.Errorf("expected queued ack, got: %s", w.Body.String())
|
||||
}
|
||||
// Returned at ~budget, NOT after the (blocked) agent — proves the cap fired.
|
||||
if elapsed > 2*time.Second {
|
||||
t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE
|
||||
// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true,
|
||||
// non-empty callerID via X-Workspace-ID) — not just the anonymous callerID==""
|
||||
// canvas — so the cap-and-queue async reply reaches the frontend. A real
|
||||
// workspace caller (isCanvasUser=false) still gets NO broadcast.
|
||||
func TestLogA2ASuccess_BroadcastsForCanvasUser(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
rec := &recordingBroadcaster{}
|
||||
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-cu").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Canvas Target"))
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Authenticated canvas user: callerID non-empty, isCanvasUser=true.
|
||||
handler.logA2ASuccess(context.Background(), "ws-cu", "ws-canvas-user", true, []byte(`{}`), []byte(`{"result":"hi"}`), "message/send", 200, 12)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
got := false
|
||||
for _, c := range rec.calls {
|
||||
if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-cu" {
|
||||
got = true
|
||||
}
|
||||
}
|
||||
if !got {
|
||||
t.Fatalf("expected A2A_RESPONSE broadcast for authenticated canvas user; recorded: %+v", rec.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// A real workspace-to-workspace caller (isCanvasUser=false) gets NO A2A_RESPONSE.
|
||||
func TestLogA2ASuccess_NoBroadcastForWorkspaceCaller(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
rec := &recordingBroadcaster{}
|
||||
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-peer").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Peer"))
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ASuccess(context.Background(), "ws-peer", "ws-other", false, []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 12)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
for _, c := range rec.calls {
|
||||
if c.eventType == "A2A_RESPONSE" {
|
||||
t.Fatalf("unexpected A2A_RESPONSE broadcast for a workspace-to-workspace caller")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ func extractRequestID(body []byte) string {
|
||||
// mock reply in the trace alongside real-agent traffic. Without this
|
||||
// the demo would render messages on the canvas chat panel but a peer
|
||||
// node clicking through to its activity tab would see an empty list.
|
||||
func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) {
|
||||
func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body []byte, a2aMethod string, logActivity bool) (int, []byte, bool) {
|
||||
if lookupRuntime(ctx, workspaceID) != MockRuntimeName {
|
||||
return 0, nil, false
|
||||
}
|
||||
@@ -204,7 +204,7 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle
|
||||
// is identical to a real agent reply. Status 200 + duration 0
|
||||
// is the "synthesised reply" marker; activity_logs.duration_ms
|
||||
// being 0 is harmless (real fast paths can hit 0 too).
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, http.StatusOK, 0)
|
||||
}
|
||||
return http.StatusOK, respBody, true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user