diff --git a/workspace-server/internal/envx/envx.go b/workspace-server/internal/envx/envx.go index 35957e677..6c20ed512 100644 --- a/workspace-server/internal/envx/envx.go +++ b/workspace-server/internal/envx/envx.go @@ -37,3 +37,19 @@ func Int64(name string, def int64) int64 { } return def } + +// Bool reads `name` as a boolean. Returns `def` when unset or unparseable. +// Truthy values per strconv.ParseBool: 1, t, T, TRUE, true, True. +// Falsy: 0, f, F, FALSE, false, False, empty. +// All other values (including "yes", "on", "y", "n") are unparseable and +// return def. Use Bool for feature flags where the operator's mental +// model is "set truthy to enable" — a misconfigured value falls through +// to the safe default rather than silently enabling a feature. +func Bool(name string, def bool) bool { + if v := os.Getenv(name); v != "" { + if b, err := strconv.ParseBool(v); err == nil { + return b + } + } + return def +} diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index b78990c63..7c32526b4 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -380,25 +380,40 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { } } - // CANVAS CAP-AND-QUEUE (core#2751, DEFAULT OFF). The canvas→agent POST is + // CANVAS CAP-AND-QUEUE (core#2751, DEFAULT ON). The canvas→agent POST is // held for the whole turn; a turn longer than Cloudflare's ~100s edge limit - // returns a 524 (the recurring "Failed to send"). When A2A_CANVAS_SYNC_BUDGET - // > 0, cap the SYNCHRONOUS wait for canvas callers below that limit: if the + // returns a 524 (the recurring "Failed to send"). By default we cap the + // SYNCHRONOUS wait for canvas callers at 90s (just under CF's edge): if the // turn hasn't finished by the budget, ack `{status:"queued"}` and let the // dispatch finish on its own — proxyA2ARequest's dispatch already runs on a // context.WithoutCancel forward ctx (idle-bounded), so it survives this // handler returning, and the agent's reply reaches the canvas via the // AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work // runs on a detached ctx so its DB logging isn't cancelled when we return. - // Budget=0 (default) → the unchanged synchronous path below; no behavior - // change until an operator opts in. See the design on core#2751. - if budget := envx.Duration("A2A_CANVAS_SYNC_BUDGET", 0); budget > 0 && (callerID == "" || isCanvasUser) { + // + // Operators can tune the budget via the env var (e.g. 60s for more + // conservative environments). The default of 90s is the durable fix that + // removes the CF 100s ceiling for the canvas path. envx.Duration treats + // 0/negative as "not set" and falls through to the default — to disable + // the cap, an operator must explicitly patch the default (a code change). + // See the design on core#2751. + // + // The budget lookup is extracted into canvasA2ASyncBudget (below) so the + // default value is unit-testable without source-string matching. + // + // Runtime kill-switch (core#2751 RC #11552): canvasA2ASyncDisabled() + // is the explicit opt-out that takes precedence over the budget value. + // When set, the entire cap-and-queue goroutine is skipped and the + // legacy synchronous path runs. Ops can flip this at runtime to + // disable the async path if it misbehaves in prod. + if !canvasA2ASyncDisabled() && canvasA2ASyncBudget() > 0 && (callerID == "" || isCanvasUser) { type a2aResult struct { status int body []byte perr *proxyA2AError } detached := context.WithoutCancel(ctx) + budget := canvasA2ASyncBudget() // local copy for the time.After below done := make(chan a2aResult, 1) go func() { s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser) @@ -1214,3 +1229,41 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID }() return ctx, cancel } + +// canvasA2ASyncBudget is the extracted lookup for the cap-and-queue synchronous +// wait (core#2751). Extracted from the ProxyA2A handler so the default value +// can be unit-tested directly without source-string matching — a regression of +// the default to 0 (legacy always-sync, which would re-expose the canvas path +// to the 524+WS-starvation class) is caught by the unit test on this function. +// +// Default is 90s — just under Cloudflare's ~100s edge limit, so a turn that +// outlives the budget triggers the queued ack before CF drops the request. +// +// The envx.Duration wrapper lets operators tune the budget +// (A2A_CANVAS_SYNC_BUDGET=60s for more conservative environments) without a +// code change. envx treats 0 / negative / invalid values as "not set" and +// falls through to the default. +// +// **Runtime kill-switch (core#2751 RC #11552):** the cap-and-queue can +// also be disabled at runtime by setting A2A_CANVAS_SYNC_DISABLE=1 (or any +// truthy envx.Bool value). When set, ProxyA2A skips the cap-and-queue +// goroutine entirely and uses the legacy synchronous path, regardless of +// the budget value. Ops can flip this without a deploy if the async path +// misbehaves in prod. The kill-switch is implemented as a SEPARATE env +// var (not via the budget=0 path) because envx.Duration treats 0/negative +// as "not set" and falls through to the default — there was no other +// way to disable at runtime without a code change. +func canvasA2ASyncBudget() time.Duration { + return envx.Duration("A2A_CANVAS_SYNC_BUDGET", 90*time.Second) +} + +// canvasA2ASyncDisabled is the runtime kill-switch for the cap-and-queue +// async-dispatch path. Returns true when A2A_CANVAS_SYNC_DISABLE is set +// to a truthy value (per envx.Bool semantics: 1, t, true, TRUE, T — +// NOT 0, f, false, F, empty). When true, ProxyA2A skips the +// cap-and-queue goroutine entirely and uses the legacy synchronous +// path. Extracted so the kill-switch default is unit-testable +// independently of the budget value. +func canvasA2ASyncDisabled() bool { + return envx.Bool("A2A_CANVAS_SYNC_DISABLE", false) +} diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index b4e1d851c..a89312f89 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -3020,6 +3020,282 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) { } } +// TestCanvasA2ASyncBudget_DefaultIs90s pins the core#2751 durable fix at +// the unit level: the cap-and-queue synchronous-budget default must be +// 90s (just under Cloudflare's ~100s edge limit). A regression to 0 +// (the legacy always-sync value) would re-expose the canvas path to +// the 524+WS-starvation class. Test reads the function directly — no +// ProxyA2A integration setup needed. +func TestCanvasA2ASyncBudget_DefaultIs90s(t *testing.T) { + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "") + + got := canvasA2ASyncBudget() + want := 90 * time.Second + if got != want { + t.Fatalf("canvasA2ASyncBudget() = %v, want %v — default regression on the core#2751 durable fix (regression to 0 would re-expose canvas to CF 524)", got, want) + } + if got <= 0 { + t.Fatalf("canvasA2ASyncBudget() = %v, must be > 0 (a non-positive default would re-enable the legacy always-sync path that causes 524+WS-starvation)", got) + } +} + +// TestCanvasA2ASyncBudget_EnvOverride covers the operator tuning path: +// A2A_CANVAS_SYNC_BUDGET=60s → 60s cap; any other valid positive +// duration → that duration. Invalid values fall back to the 90s default. +// +// Note: envx.Duration treats `0` and negative values as "not set" (the +// `d > 0` check), so they fall through to the 90s default. The +// runtime kill-switch A2A_CANVAS_SYNC_DISABLE (separate env var) is +// the operator's way to disable the cap at runtime — see +// TestCanvasA2ASyncDisabled. +func TestCanvasA2ASyncBudget_EnvOverride(t *testing.T) { + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "60s") + if got := canvasA2ASyncBudget(); got != 60*time.Second { + t.Errorf("A2A_CANVAS_SYNC_BUDGET=60s should set the cap to 60s; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "120s") + if got := canvasA2ASyncBudget(); got != 120*time.Second { + t.Errorf("A2A_CANVAS_SYNC_BUDGET=120s should set the cap to 120s; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "invalid") + if got := canvasA2ASyncBudget(); got != 90*time.Second { + t.Errorf("invalid A2A_CANVAS_SYNC_BUDGET should fall back to the 90s default; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "0") + if got := canvasA2ASyncBudget(); got != 90*time.Second { + t.Errorf("A2A_CANVAS_SYNC_BUDGET=0 should fall back to the 90s default (envx treats 0 as not-set); got %v", got) + } +} + +// TestCanvasA2ASyncDisabled pins the runtime kill-switch (core#2751 RC +// #11552): A2A_CANVAS_SYNC_DISABLE=1 (or any truthy value) flips the +// canvas to the legacy synchronous path, independent of the budget. +// Defaults to false (cap enabled). Truthy values: 1, t, true, TRUE, T +// (per envx.Bool semantics). Falsy: 0, f, false, FALSE, F, empty. +func TestCanvasA2ASyncDisabled(t *testing.T) { + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "") + if got := canvasA2ASyncDisabled(); got != false { + t.Errorf("A2A_CANVAS_SYNC_DISABLE unset should be false (cap enabled); got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "1") + if got := canvasA2ASyncDisabled(); got != true { + t.Errorf("A2A_CANVAS_SYNC_DISABLE=1 should disable the cap; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "true") + if got := canvasA2ASyncDisabled(); got != true { + t.Errorf("A2A_CANVAS_SYNC_DISABLE=true should disable the cap; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "0") + if got := canvasA2ASyncDisabled(); got != false { + t.Errorf("A2A_CANVAS_SYNC_DISABLE=0 should leave the cap enabled; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "false") + if got := canvasA2ASyncDisabled(); got != false { + t.Errorf("A2A_CANVAS_SYNC_DISABLE=false should leave the cap enabled; got %v", got) + } + + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "invalid") + if got := canvasA2ASyncDisabled(); got != false { + t.Errorf("invalid A2A_CANVAS_SYNC_DISABLE should fall back to false (cap enabled); got %v", got) + } +} + +// TestProxyA2A_CanvasCapAndQueue_RuntimeKillSwitchDisabled pins the +// integration behavior: when A2A_CANVAS_SYNC_DISABLE=1, a canvas turn +// that WOULD exceed the synchronous budget (e.g., a tiny 50ms budget +// + a slow 500ms agent) does NOT return `{status:"queued"}` — the +// kill-switch forces the legacy synchronous path, the handler waits +// the full agent duration, and the actual reply is returned inline. +// +// This is the CTO-priority ops escape hatch for the durable fix: if +// the async path misbehaves in prod, ops can disable it without a +// deploy. This test proves the disable actually works. +func TestProxyA2A_CanvasCapAndQueue_RuntimeKillSwitchDisabled(t *testing.T) { + // Sub-budget (50ms) to force the queued path... if the kill-switch + // were NOT honored, this would trigger the queued ack. The kill- + // switch inverts the expectation: the handler should wait the full + // agent hold and return the actual reply. + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms") + t.Setenv("A2A_CANVAS_SYNC_DISABLE", "1") + + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // Agent holds 500ms (>> 50ms budget — would force queued path if + // the kill-switch were not honored). + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(500 * time.Millisecond) + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-1","result":{"status":"ok","reply":"kill-switch-disabled-reply"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-killswitch"), agentServer.URL) + expectBudgetCheck(mock, "ws-killswitch") + // persistUserMessageAtIngest + logA2ASuccess fire on the synchronous + // path (no detached goroutine). .Maybe()-style tolerance. + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-killswitch"}} + body := `{"jsonrpc":"2.0","id":"req-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-ks-001","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-killswitch/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + start := time.Now() + handler.ProxyA2A(c) + elapsed := time.Since(start) + + // 1. The HTTP response is the ACTUAL AGENT REPLY (not the queued + // ack). The kill-switch forces the legacy synchronous path, so the + // handler waits the full agent duration. + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (agent reply), got %d: %s", w.Code, w.Body.String()) + } + if strings.Contains(w.Body.String(), `"queued"`) { + t.Errorf("kill-switch should suppress the queued ack; got: %s", w.Body.String()) + } + if !strings.Contains(w.Body.String(), `"kill-switch-disabled-reply"`) { + t.Errorf("expected actual agent reply (containing the reply field), got: %s", w.Body.String()) + } + // 2. The handler waited the full ~500ms agent hold (NOT ~50ms + // budget). Proves the kill-switch bypasses the cap-and-queue + // goroutine entirely. + if elapsed < 400*time.Millisecond { + t.Errorf("handler returned too quickly (%v); the kill-switch should NOT have fired the queued-ack path — the agent should have replied inline after ~500ms", elapsed) + } + if elapsed > 1*time.Second { + t.Errorf("handler took too long (%v); expected ~500ms agent hold", elapsed) + } +} + +// TestProxyA2A_CanvasCapAndQueue_EndToEndContract pins the FULL contract +// (core#2751): a canvas turn that outlives the synchronous budget returns +// `{status:"queued"}` immediately, the dispatch continues on a detached +// forward ctx, and the agent's eventual reply is durably logged + broadcast +// as A2A_RESPONSE with the originating message_id (so the canvas WS handler +// can attach the reply to the right chat bubble). +// +// Uses a SUB-BUDGET (50ms) to force the queued branch deterministically; the +// agent server holds 500ms before replying, so the HTTP handler returns +// `queued` well before the agent finishes. Then we wait for the detached +// goroutine + broadcaster to complete and assert the recorder saw the +// A2A_RESPONSE broadcast with the right message_id. +func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) { + // Force the queued branch deterministically with a tiny budget. + t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms") + + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + rec := &recordingBroadcaster{} + handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // Agent holds the connection 500ms (>> 50ms budget → forces queued path). + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(500 * time.Millisecond) + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-1","result":{"status":"ok","reply":"hello"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-e2e"), agentServer.URL) + expectBudgetCheck(mock, "ws-e2e") + // persistUserMessageAtIngest fires in the detached goroutine; also + // logA2ASuccess fires on agent reply. .Maybe()-style tolerance: async + // ordering means we don't strictly assert ExpectationsWereMet. + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-e2e"}} + // message_id = "msg-e2e-001" — used to verify the broadcast carries + // the right originating message_id so the canvas can attach the reply + // to the right chat bubble. + body := `{"jsonrpc":"2.0","id":"req-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-e2e-001","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-e2e/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + start := time.Now() + handler.ProxyA2A(c) + elapsed := time.Since(start) + + // 1. The HTTP response is the queued ack (not the agent reply). + if w.Code != http.StatusOK { + t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), `"queued"`) { + t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String()) + } + if !strings.Contains(w.Body.String(), `"push-async"`) { + t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String()) + } + // Returned at ~budget, NOT after the (blocked) agent. + if elapsed > 300*time.Millisecond { + t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed) + } + + // 2. Wait for the detached goroutine to finish + the broadcast to fire. + // The agent takes ~500ms; the broadcast is recorded synchronously + // inside logA2ASuccess. 2s is plenty of headroom. + deadline := time.Now().Add(2 * time.Second) + var sawA2AResponse bool + var sawResponseBodyContent bool + for time.Now().Before(deadline) { + for _, c := range rec.calls { + if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-e2e" { + // Assert the originating message_id is carried so the + // canvas WS handler can attach the reply to the right + // chat bubble. + if mid, ok := c.payload["message_id"].(string); ok && mid == "msg-e2e-001" { + sawA2AResponse = true + } + // Assert the ACTUAL result payload is delivered + // (Researcher #11553 RC #1). A regression that broadcasts + // an empty/wrong response_body with the right message_id + // would pass the message_id-only check while leaving the + // canvas with no result to render — the exact failure + // class this PR is meant to close. The agent's reply + // is the `reply: "hello"` field; the broadcast payload's + // response_body is the deserialized JSON map + // `{"id":..., "jsonrpc":..., "result":{"reply":"hello","status":"ok"}}`. + if rbMap, ok := c.payload["response_body"].(map[string]interface{}); ok { + if resultMap, ok := rbMap["result"].(map[string]interface{}); ok { + if reply, ok := resultMap["reply"].(string); ok && reply == "hello" { + sawResponseBodyContent = true + } + } + } + } + } + if sawA2AResponse && sawResponseBodyContent { + break + } + time.Sleep(20 * time.Millisecond) + } + if !sawA2AResponse { + t.Fatalf("expected A2A_RESPONSE broadcast for ws-e2e with message_id=msg-e2e-001 within 2s; recorded: %+v", rec.calls) + } + if !sawResponseBodyContent { + t.Fatalf("expected A2A_RESPONSE payload to carry the agent's actual reply content (`reply:\"hello\"`) so the canvas can render it; recorded: %+v", rec.calls) + } +} + // TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE // WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true,