fix(workspace-server#2751): make canvas cap-and-queue default-ON at 90s #2800

Merged
devops-engineer merged 4 commits from fix/2751-canvas-async-dispatch into main 2026-06-14 01:37:51 +00:00
3 changed files with 351 additions and 6 deletions
+16
View File
@@ -37,3 +37,19 @@ func Int64(name string, def int64) int64 {
}
return def
}
// Bool reads `name` as a boolean. Returns `def` when unset or unparseable.
// Truthy values per strconv.ParseBool: 1, t, T, TRUE, true, True.
// Falsy: 0, f, F, FALSE, false, False, empty.
// All other values (including "yes", "on", "y", "n") are unparseable and
// return def. Use Bool for feature flags where the operator's mental
// model is "set truthy to enable" — a misconfigured value falls through
// to the safe default rather than silently enabling a feature.
func Bool(name string, def bool) bool {
if v := os.Getenv(name); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
return b
}
}
return def
}
@@ -380,25 +380,40 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
}
}
// CANVAS CAP-AND-QUEUE (core#2751, DEFAULT OFF). The canvas→agent POST is
// CANVAS CAP-AND-QUEUE (core#2751, DEFAULT ON). The canvas→agent POST is
// held for the whole turn; a turn longer than Cloudflare's ~100s edge limit
// returns a 524 (the recurring "Failed to send"). When A2A_CANVAS_SYNC_BUDGET
// > 0, cap the SYNCHRONOUS wait for canvas callers below that limit: if the
// returns a 524 (the recurring "Failed to send"). By default we cap the
// SYNCHRONOUS wait for canvas callers at 90s (just under CF's edge): if the
// turn hasn't finished by the budget, ack `{status:"queued"}` and let the
// dispatch finish on its own — proxyA2ARequest's dispatch already runs on a
// context.WithoutCancel forward ctx (idle-bounded), so it survives this
// handler returning, and the agent's reply reaches the canvas via the
// AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work
// runs on a detached ctx so its DB logging isn't cancelled when we return.
// Budget=0 (default) → the unchanged synchronous path below; no behavior
// change until an operator opts in. See the design on core#2751.
if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && (callerID == "" || isCanvasUser) {
//
// Operators can tune the budget via the env var (e.g. 60s for more
// conservative environments). The default of 90s is the durable fix that
// removes the CF 100s ceiling for the canvas path. envx.Duration treats
// 0/negative as "not set" and falls through to the default — to disable
// the cap, an operator must explicitly patch the default (a code change).
// See the design on core#2751.
//
// The budget lookup is extracted into canvasA2ASyncBudget (below) so the
// default value is unit-testable without source-string matching.
//
// Runtime kill-switch (core#2751 RC #11552): canvasA2ASyncDisabled()
// is the explicit opt-out that takes precedence over the budget value.
// When set, the entire cap-and-queue goroutine is skipped and the
// legacy synchronous path runs. Ops can flip this at runtime to
// disable the async path if it misbehaves in prod.
if !canvasA2ASyncDisabled() && canvasA2ASyncBudget() > 0 && (callerID == "" || isCanvasUser) {
type a2aResult struct {
status int
body []byte
perr *proxyA2AError
}
detached := context.WithoutCancel(ctx)
budget := canvasA2ASyncBudget() // local copy for the time.After below
done := make(chan a2aResult, 1)
go func() {
s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser)
@@ -1214,3 +1229,41 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
}()
return ctx, cancel
}
// canvasA2ASyncBudget is the extracted lookup for the cap-and-queue synchronous
// wait (core#2751). Extracted from the ProxyA2A handler so the default value
// can be unit-tested directly without source-string matching — a regression of
// the default to 0 (legacy always-sync, which would re-expose the canvas path
// to the 524+WS-starvation class) is caught by the unit test on this function.
//
// Default is 90s — just under Cloudflare's ~100s edge limit, so a turn that
// outlives the budget triggers the queued ack before CF drops the request.
//
// The envx.Duration wrapper lets operators tune the budget
// (A2A_CANVAS_SYNC_BUDGET=60s for more conservative environments) without a
// code change. envx treats 0 / negative / invalid values as "not set" and
// falls through to the default.
//
// **Runtime kill-switch (core#2751 RC #11552):** the cap-and-queue can
// also be disabled at runtime by setting A2A_CANVAS_SYNC_DISABLE=1 (or any
// truthy envx.Bool value). When set, ProxyA2A skips the cap-and-queue
// goroutine entirely and uses the legacy synchronous path, regardless of
// the budget value. Ops can flip this without a deploy if the async path
// misbehaves in prod. The kill-switch is implemented as a SEPARATE env
// var (not via the budget=0 path) because envx.Duration treats 0/negative
// as "not set" and falls through to the default — there was no other
// way to disable at runtime without a code change.
func canvasA2ASyncBudget() time.Duration {
return envx.Duration("A2A_CANVAS_SYNC_BUDGET", 90*time.Second)
}
// canvasA2ASyncDisabled is the runtime kill-switch for the cap-and-queue
// async-dispatch path. Returns true when A2A_CANVAS_SYNC_DISABLE is set
// to a truthy value (per envx.Bool semantics: 1, t, true, TRUE, T —
// NOT 0, f, false, F, empty). When true, ProxyA2A skips the
// cap-and-queue goroutine entirely and uses the legacy synchronous
// path. Extracted so the kill-switch default is unit-testable
// independently of the budget value.
func canvasA2ASyncDisabled() bool {
return envx.Bool("A2A_CANVAS_SYNC_DISABLE", false)
}
@@ -3020,6 +3020,282 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
}
}
// TestCanvasA2ASyncBudget_DefaultIs90s pins the core#2751 durable fix at
// the unit level: the cap-and-queue synchronous-budget default must be
// 90s (just under Cloudflare's ~100s edge limit). A regression to 0
// (the legacy always-sync value) would re-expose the canvas path to
// the 524+WS-starvation class. Test reads the function directly — no
// ProxyA2A integration setup needed.
func TestCanvasA2ASyncBudget_DefaultIs90s(t *testing.T) {
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "")
got := canvasA2ASyncBudget()
want := 90 * time.Second
if got != want {
t.Fatalf("canvasA2ASyncBudget() = %v, want %v — default regression on the core#2751 durable fix (regression to 0 would re-expose canvas to CF 524)", got, want)
}
if got <= 0 {
t.Fatalf("canvasA2ASyncBudget() = %v, must be > 0 (a non-positive default would re-enable the legacy always-sync path that causes 524+WS-starvation)", got)
}
}
// TestCanvasA2ASyncBudget_EnvOverride covers the operator tuning path:
// A2A_CANVAS_SYNC_BUDGET=60s → 60s cap; any other valid positive
// duration → that duration. Invalid values fall back to the 90s default.
//
// Note: envx.Duration treats `0` and negative values as "not set" (the
// `d > 0` check), so they fall through to the 90s default. The
// runtime kill-switch A2A_CANVAS_SYNC_DISABLE (separate env var) is
// the operator's way to disable the cap at runtime — see
// TestCanvasA2ASyncDisabled.
func TestCanvasA2ASyncBudget_EnvOverride(t *testing.T) {
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "60s")
if got := canvasA2ASyncBudget(); got != 60*time.Second {
t.Errorf("A2A_CANVAS_SYNC_BUDGET=60s should set the cap to 60s; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "120s")
if got := canvasA2ASyncBudget(); got != 120*time.Second {
t.Errorf("A2A_CANVAS_SYNC_BUDGET=120s should set the cap to 120s; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "invalid")
if got := canvasA2ASyncBudget(); got != 90*time.Second {
t.Errorf("invalid A2A_CANVAS_SYNC_BUDGET should fall back to the 90s default; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "0")
if got := canvasA2ASyncBudget(); got != 90*time.Second {
t.Errorf("A2A_CANVAS_SYNC_BUDGET=0 should fall back to the 90s default (envx treats 0 as not-set); got %v", got)
}
}
// TestCanvasA2ASyncDisabled pins the runtime kill-switch (core#2751 RC
// #11552): A2A_CANVAS_SYNC_DISABLE=1 (or any truthy value) flips the
// canvas to the legacy synchronous path, independent of the budget.
// Defaults to false (cap enabled). Truthy values: 1, t, true, TRUE, T
// (per envx.Bool semantics). Falsy: 0, f, false, FALSE, F, empty.
func TestCanvasA2ASyncDisabled(t *testing.T) {
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "")
if got := canvasA2ASyncDisabled(); got != false {
t.Errorf("A2A_CANVAS_SYNC_DISABLE unset should be false (cap enabled); got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "1")
if got := canvasA2ASyncDisabled(); got != true {
t.Errorf("A2A_CANVAS_SYNC_DISABLE=1 should disable the cap; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "true")
if got := canvasA2ASyncDisabled(); got != true {
t.Errorf("A2A_CANVAS_SYNC_DISABLE=true should disable the cap; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "0")
if got := canvasA2ASyncDisabled(); got != false {
t.Errorf("A2A_CANVAS_SYNC_DISABLE=0 should leave the cap enabled; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "false")
if got := canvasA2ASyncDisabled(); got != false {
t.Errorf("A2A_CANVAS_SYNC_DISABLE=false should leave the cap enabled; got %v", got)
}
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "invalid")
if got := canvasA2ASyncDisabled(); got != false {
t.Errorf("invalid A2A_CANVAS_SYNC_DISABLE should fall back to false (cap enabled); got %v", got)
}
}
// TestProxyA2A_CanvasCapAndQueue_RuntimeKillSwitchDisabled pins the
// integration behavior: when A2A_CANVAS_SYNC_DISABLE=1, a canvas turn
// that WOULD exceed the synchronous budget (e.g., a tiny 50ms budget
// + a slow 500ms agent) does NOT return `{status:"queued"}` — the
// kill-switch forces the legacy synchronous path, the handler waits
// the full agent duration, and the actual reply is returned inline.
//
// This is the CTO-priority ops escape hatch for the durable fix: if
// the async path misbehaves in prod, ops can disable it without a
// deploy. This test proves the disable actually works.
func TestProxyA2A_CanvasCapAndQueue_RuntimeKillSwitchDisabled(t *testing.T) {
// Sub-budget (50ms) to force the queued path... if the kill-switch
// were NOT honored, this would trigger the queued ack. The kill-
// switch inverts the expectation: the handler should wait the full
// agent hold and return the actual reply.
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms")
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "1")
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Agent holds 500ms (>> 50ms budget — would force queued path if
// the kill-switch were not honored).
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(500 * time.Millisecond)
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-1","result":{"status":"ok","reply":"kill-switch-disabled-reply"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-killswitch"), agentServer.URL)
expectBudgetCheck(mock, "ws-killswitch")
// persistUserMessageAtIngest + logA2ASuccess fire on the synchronous
// path (no detached goroutine). .Maybe()-style tolerance.
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-killswitch"}}
body := `{"jsonrpc":"2.0","id":"req-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-ks-001","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-killswitch/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
start := time.Now()
handler.ProxyA2A(c)
elapsed := time.Since(start)
// 1. The HTTP response is the ACTUAL AGENT REPLY (not the queued
// ack). The kill-switch forces the legacy synchronous path, so the
// handler waits the full agent duration.
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (agent reply), got %d: %s", w.Code, w.Body.String())
}
if strings.Contains(w.Body.String(), `"queued"`) {
t.Errorf("kill-switch should suppress the queued ack; got: %s", w.Body.String())
}
if !strings.Contains(w.Body.String(), `"kill-switch-disabled-reply"`) {
t.Errorf("expected actual agent reply (containing the reply field), got: %s", w.Body.String())
}
// 2. The handler waited the full ~500ms agent hold (NOT ~50ms
// budget). Proves the kill-switch bypasses the cap-and-queue
// goroutine entirely.
if elapsed < 400*time.Millisecond {
t.Errorf("handler returned too quickly (%v); the kill-switch should NOT have fired the queued-ack path — the agent should have replied inline after ~500ms", elapsed)
}
if elapsed > 1*time.Second {
t.Errorf("handler took too long (%v); expected ~500ms agent hold", elapsed)
}
}
// TestProxyA2A_CanvasCapAndQueue_EndToEndContract pins the FULL contract
// (core#2751): a canvas turn that outlives the synchronous budget returns
// `{status:"queued"}` immediately, the dispatch continues on a detached
// forward ctx, and the agent's eventual reply is durably logged + broadcast
// as A2A_RESPONSE with the originating message_id (so the canvas WS handler
// can attach the reply to the right chat bubble).
//
// Uses a SUB-BUDGET (50ms) to force the queued branch deterministically; the
// agent server holds 500ms before replying, so the HTTP handler returns
// `queued` well before the agent finishes. Then we wait for the detached
// goroutine + broadcaster to complete and assert the recorder saw the
// A2A_RESPONSE broadcast with the right message_id.
func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
// Force the queued branch deterministically with a tiny budget.
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms")
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
rec := &recordingBroadcaster{}
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Agent holds the connection 500ms (>> 50ms budget → forces queued path).
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(500 * time.Millisecond)
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-1","result":{"status":"ok","reply":"hello"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-e2e"), agentServer.URL)
expectBudgetCheck(mock, "ws-e2e")
// persistUserMessageAtIngest fires in the detached goroutine; also
// logA2ASuccess fires on agent reply. .Maybe()-style tolerance: async
// ordering means we don't strictly assert ExpectationsWereMet.
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-e2e"}}
// message_id = "msg-e2e-001" — used to verify the broadcast carries
// the right originating message_id so the canvas can attach the reply
// to the right chat bubble.
body := `{"jsonrpc":"2.0","id":"req-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-e2e-001","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-e2e/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
start := time.Now()
handler.ProxyA2A(c)
elapsed := time.Since(start)
// 1. The HTTP response is the queued ack (not the agent reply).
if w.Code != http.StatusOK {
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), `"queued"`) {
t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String())
}
if !strings.Contains(w.Body.String(), `"push-async"`) {
t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String())
}
// Returned at ~budget, NOT after the (blocked) agent.
if elapsed > 300*time.Millisecond {
t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed)
}
// 2. Wait for the detached goroutine to finish + the broadcast to fire.
// The agent takes ~500ms; the broadcast is recorded synchronously
// inside logA2ASuccess. 2s is plenty of headroom.
deadline := time.Now().Add(2 * time.Second)
var sawA2AResponse bool
var sawResponseBodyContent bool
for time.Now().Before(deadline) {
for _, c := range rec.calls {
if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-e2e" {
// Assert the originating message_id is carried so the
// canvas WS handler can attach the reply to the right
// chat bubble.
if mid, ok := c.payload["message_id"].(string); ok && mid == "msg-e2e-001" {
sawA2AResponse = true
}
// Assert the ACTUAL result payload is delivered
// (Researcher #11553 RC #1). A regression that broadcasts
// an empty/wrong response_body with the right message_id
// would pass the message_id-only check while leaving the
// canvas with no result to render — the exact failure
// class this PR is meant to close. The agent's reply
// is the `reply: "hello"` field; the broadcast payload's
// response_body is the deserialized JSON map
// `{"id":..., "jsonrpc":..., "result":{"reply":"hello","status":"ok"}}`.
if rbMap, ok := c.payload["response_body"].(map[string]interface{}); ok {
if resultMap, ok := rbMap["result"].(map[string]interface{}); ok {
if reply, ok := resultMap["reply"].(string); ok && reply == "hello" {
sawResponseBodyContent = true
}
}
}
}
}
if sawA2AResponse && sawResponseBodyContent {
break
}
time.Sleep(20 * time.Millisecond)
}
if !sawA2AResponse {
t.Fatalf("expected A2A_RESPONSE broadcast for ws-e2e with message_id=msg-e2e-001 within 2s; recorded: %+v", rec.calls)
}
if !sawResponseBodyContent {
t.Fatalf("expected A2A_RESPONSE payload to carry the agent's actual reply content (`reply:\"hello\"`) so the canvas can render it; recorded: %+v", rec.calls)
}
}
// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE
// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true,