|
|
|
@@ -3020,6 +3020,282 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestCanvasA2ASyncBudget_DefaultIs90s pins the core#2751 durable fix at
|
|
|
|
|
// the unit level: the cap-and-queue synchronous-budget default must be
|
|
|
|
|
// 90s (just under Cloudflare's ~100s edge limit). A regression to 0
|
|
|
|
|
// (the legacy always-sync value) would re-expose the canvas path to
|
|
|
|
|
// the 524+WS-starvation class. Test reads the function directly — no
|
|
|
|
|
// ProxyA2A integration setup needed.
|
|
|
|
|
func TestCanvasA2ASyncBudget_DefaultIs90s(t *testing.T) {
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "")
|
|
|
|
|
|
|
|
|
|
got := canvasA2ASyncBudget()
|
|
|
|
|
want := 90 * time.Second
|
|
|
|
|
if got != want {
|
|
|
|
|
t.Fatalf("canvasA2ASyncBudget() = %v, want %v — default regression on the core#2751 durable fix (regression to 0 would re-expose canvas to CF 524)", got, want)
|
|
|
|
|
}
|
|
|
|
|
if got <= 0 {
|
|
|
|
|
t.Fatalf("canvasA2ASyncBudget() = %v, must be > 0 (a non-positive default would re-enable the legacy always-sync path that causes 524+WS-starvation)", got)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestCanvasA2ASyncBudget_EnvOverride covers the operator tuning path:
|
|
|
|
|
// A2A_CANVAS_SYNC_BUDGET=60s → 60s cap; any other valid positive
|
|
|
|
|
// duration → that duration. Invalid values fall back to the 90s default.
|
|
|
|
|
//
|
|
|
|
|
// Note: envx.Duration treats `0` and negative values as "not set" (the
|
|
|
|
|
// `d > 0` check), so they fall through to the 90s default. The
|
|
|
|
|
// runtime kill-switch A2A_CANVAS_SYNC_DISABLE (separate env var) is
|
|
|
|
|
// the operator's way to disable the cap at runtime — see
|
|
|
|
|
// TestCanvasA2ASyncDisabled.
|
|
|
|
|
func TestCanvasA2ASyncBudget_EnvOverride(t *testing.T) {
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "60s")
|
|
|
|
|
if got := canvasA2ASyncBudget(); got != 60*time.Second {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_BUDGET=60s should set the cap to 60s; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "120s")
|
|
|
|
|
if got := canvasA2ASyncBudget(); got != 120*time.Second {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_BUDGET=120s should set the cap to 120s; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "invalid")
|
|
|
|
|
if got := canvasA2ASyncBudget(); got != 90*time.Second {
|
|
|
|
|
t.Errorf("invalid A2A_CANVAS_SYNC_BUDGET should fall back to the 90s default; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "0")
|
|
|
|
|
if got := canvasA2ASyncBudget(); got != 90*time.Second {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_BUDGET=0 should fall back to the 90s default (envx treats 0 as not-set); got %v", got)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestCanvasA2ASyncDisabled pins the runtime kill-switch (core#2751 RC
|
|
|
|
|
// #11552): A2A_CANVAS_SYNC_DISABLE=1 (or any truthy value) flips the
|
|
|
|
|
// canvas to the legacy synchronous path, independent of the budget.
|
|
|
|
|
// Defaults to false (cap enabled). Truthy values: 1, t, true, TRUE, T
|
|
|
|
|
// (per envx.Bool semantics). Falsy: 0, f, false, FALSE, F, empty.
|
|
|
|
|
func TestCanvasA2ASyncDisabled(t *testing.T) {
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "")
|
|
|
|
|
if got := canvasA2ASyncDisabled(); got != false {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_DISABLE unset should be false (cap enabled); got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "1")
|
|
|
|
|
if got := canvasA2ASyncDisabled(); got != true {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_DISABLE=1 should disable the cap; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "true")
|
|
|
|
|
if got := canvasA2ASyncDisabled(); got != true {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_DISABLE=true should disable the cap; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "0")
|
|
|
|
|
if got := canvasA2ASyncDisabled(); got != false {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_DISABLE=0 should leave the cap enabled; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "false")
|
|
|
|
|
if got := canvasA2ASyncDisabled(); got != false {
|
|
|
|
|
t.Errorf("A2A_CANVAS_SYNC_DISABLE=false should leave the cap enabled; got %v", got)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "invalid")
|
|
|
|
|
if got := canvasA2ASyncDisabled(); got != false {
|
|
|
|
|
t.Errorf("invalid A2A_CANVAS_SYNC_DISABLE should fall back to false (cap enabled); got %v", got)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestProxyA2A_CanvasCapAndQueue_RuntimeKillSwitchDisabled pins the
|
|
|
|
|
// integration behavior: when A2A_CANVAS_SYNC_DISABLE=1, a canvas turn
|
|
|
|
|
// that WOULD exceed the synchronous budget (e.g., a tiny 50ms budget
|
|
|
|
|
// + a slow 500ms agent) does NOT return `{status:"queued"}` — the
|
|
|
|
|
// kill-switch forces the legacy synchronous path, the handler waits
|
|
|
|
|
// the full agent duration, and the actual reply is returned inline.
|
|
|
|
|
//
|
|
|
|
|
// This is the CTO-priority ops escape hatch for the durable fix: if
|
|
|
|
|
// the async path misbehaves in prod, ops can disable it without a
|
|
|
|
|
// deploy. This test proves the disable actually works.
|
|
|
|
|
func TestProxyA2A_CanvasCapAndQueue_RuntimeKillSwitchDisabled(t *testing.T) {
|
|
|
|
|
// Sub-budget (50ms) to force the queued path... if the kill-switch
|
|
|
|
|
// were NOT honored, this would trigger the queued ack. The kill-
|
|
|
|
|
// switch inverts the expectation: the handler should wait the full
|
|
|
|
|
// agent hold and return the actual reply.
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms")
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_DISABLE", "1")
|
|
|
|
|
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
mr := setupTestRedis(t)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
|
|
|
|
|
|
|
|
|
// Agent holds 500ms (>> 50ms budget — would force queued path if
|
|
|
|
|
// the kill-switch were not honored).
|
|
|
|
|
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
|
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-1","result":{"status":"ok","reply":"kill-switch-disabled-reply"}}`)
|
|
|
|
|
}))
|
|
|
|
|
defer agentServer.Close()
|
|
|
|
|
|
|
|
|
|
mr.Set(fmt.Sprintf("ws:%s:url", "ws-killswitch"), agentServer.URL)
|
|
|
|
|
expectBudgetCheck(mock, "ws-killswitch")
|
|
|
|
|
// persistUserMessageAtIngest + logA2ASuccess fire on the synchronous
|
|
|
|
|
// path (no detached goroutine). .Maybe()-style tolerance.
|
|
|
|
|
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "ws-killswitch"}}
|
|
|
|
|
body := `{"jsonrpc":"2.0","id":"req-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-ks-001","parts":[{"text":"hi"}]}}}`
|
|
|
|
|
c.Request = httptest.NewRequest("POST", "/workspaces/ws-killswitch/a2a", bytes.NewBufferString(body))
|
|
|
|
|
c.Request.Header.Set("Content-Type", "application/json")
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
handler.ProxyA2A(c)
|
|
|
|
|
elapsed := time.Since(start)
|
|
|
|
|
|
|
|
|
|
// 1. The HTTP response is the ACTUAL AGENT REPLY (not the queued
|
|
|
|
|
// ack). The kill-switch forces the legacy synchronous path, so the
|
|
|
|
|
// handler waits the full agent duration.
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Fatalf("expected 200 (agent reply), got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
if strings.Contains(w.Body.String(), `"queued"`) {
|
|
|
|
|
t.Errorf("kill-switch should suppress the queued ack; got: %s", w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
if !strings.Contains(w.Body.String(), `"kill-switch-disabled-reply"`) {
|
|
|
|
|
t.Errorf("expected actual agent reply (containing the reply field), got: %s", w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
// 2. The handler waited the full ~500ms agent hold (NOT ~50ms
|
|
|
|
|
// budget). Proves the kill-switch bypasses the cap-and-queue
|
|
|
|
|
// goroutine entirely.
|
|
|
|
|
if elapsed < 400*time.Millisecond {
|
|
|
|
|
t.Errorf("handler returned too quickly (%v); the kill-switch should NOT have fired the queued-ack path — the agent should have replied inline after ~500ms", elapsed)
|
|
|
|
|
}
|
|
|
|
|
if elapsed > 1*time.Second {
|
|
|
|
|
t.Errorf("handler took too long (%v); expected ~500ms agent hold", elapsed)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestProxyA2A_CanvasCapAndQueue_EndToEndContract pins the FULL contract
|
|
|
|
|
// (core#2751): a canvas turn that outlives the synchronous budget returns
|
|
|
|
|
// `{status:"queued"}` immediately, the dispatch continues on a detached
|
|
|
|
|
// forward ctx, and the agent's eventual reply is durably logged + broadcast
|
|
|
|
|
// as A2A_RESPONSE with the originating message_id (so the canvas WS handler
|
|
|
|
|
// can attach the reply to the right chat bubble).
|
|
|
|
|
//
|
|
|
|
|
// Uses a SUB-BUDGET (50ms) to force the queued branch deterministically; the
|
|
|
|
|
// agent server holds 500ms before replying, so the HTTP handler returns
|
|
|
|
|
// `queued` well before the agent finishes. Then we wait for the detached
|
|
|
|
|
// goroutine + broadcaster to complete and assert the recorder saw the
|
|
|
|
|
// A2A_RESPONSE broadcast with the right message_id.
|
|
|
|
|
func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
|
|
|
|
|
// Force the queued branch deterministically with a tiny budget.
|
|
|
|
|
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms")
|
|
|
|
|
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
mr := setupTestRedis(t)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
rec := &recordingBroadcaster{}
|
|
|
|
|
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
|
|
|
|
|
|
|
|
|
// Agent holds the connection 500ms (>> 50ms budget → forces queued path).
|
|
|
|
|
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
|
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-1","result":{"status":"ok","reply":"hello"}}`)
|
|
|
|
|
}))
|
|
|
|
|
defer agentServer.Close()
|
|
|
|
|
|
|
|
|
|
mr.Set(fmt.Sprintf("ws:%s:url", "ws-e2e"), agentServer.URL)
|
|
|
|
|
expectBudgetCheck(mock, "ws-e2e")
|
|
|
|
|
// persistUserMessageAtIngest fires in the detached goroutine; also
|
|
|
|
|
// logA2ASuccess fires on agent reply. .Maybe()-style tolerance: async
|
|
|
|
|
// ordering means we don't strictly assert ExpectationsWereMet.
|
|
|
|
|
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "ws-e2e"}}
|
|
|
|
|
// message_id = "msg-e2e-001" — used to verify the broadcast carries
|
|
|
|
|
// the right originating message_id so the canvas can attach the reply
|
|
|
|
|
// to the right chat bubble.
|
|
|
|
|
body := `{"jsonrpc":"2.0","id":"req-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-e2e-001","parts":[{"text":"hi"}]}}}`
|
|
|
|
|
c.Request = httptest.NewRequest("POST", "/workspaces/ws-e2e/a2a", bytes.NewBufferString(body))
|
|
|
|
|
c.Request.Header.Set("Content-Type", "application/json")
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
handler.ProxyA2A(c)
|
|
|
|
|
elapsed := time.Since(start)
|
|
|
|
|
|
|
|
|
|
// 1. The HTTP response is the queued ack (not the agent reply).
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
if !strings.Contains(w.Body.String(), `"queued"`) {
|
|
|
|
|
t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
if !strings.Contains(w.Body.String(), `"push-async"`) {
|
|
|
|
|
t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
// Returned at ~budget, NOT after the (blocked) agent.
|
|
|
|
|
if elapsed > 300*time.Millisecond {
|
|
|
|
|
t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. Wait for the detached goroutine to finish + the broadcast to fire.
|
|
|
|
|
// The agent takes ~500ms; the broadcast is recorded synchronously
|
|
|
|
|
// inside logA2ASuccess. 2s is plenty of headroom.
|
|
|
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
|
|
|
var sawA2AResponse bool
|
|
|
|
|
var sawResponseBodyContent bool
|
|
|
|
|
for time.Now().Before(deadline) {
|
|
|
|
|
for _, c := range rec.calls {
|
|
|
|
|
if c.eventType == "A2A_RESPONSE" && c.workspaceID == "ws-e2e" {
|
|
|
|
|
// Assert the originating message_id is carried so the
|
|
|
|
|
// canvas WS handler can attach the reply to the right
|
|
|
|
|
// chat bubble.
|
|
|
|
|
if mid, ok := c.payload["message_id"].(string); ok && mid == "msg-e2e-001" {
|
|
|
|
|
sawA2AResponse = true
|
|
|
|
|
}
|
|
|
|
|
// Assert the ACTUAL result payload is delivered
|
|
|
|
|
// (Researcher #11553 RC #1). A regression that broadcasts
|
|
|
|
|
// an empty/wrong response_body with the right message_id
|
|
|
|
|
// would pass the message_id-only check while leaving the
|
|
|
|
|
// canvas with no result to render — the exact failure
|
|
|
|
|
// class this PR is meant to close. The agent's reply
|
|
|
|
|
// is the `reply: "hello"` field; the broadcast payload's
|
|
|
|
|
// response_body is the deserialized JSON map
|
|
|
|
|
// `{"id":..., "jsonrpc":..., "result":{"reply":"hello","status":"ok"}}`.
|
|
|
|
|
if rbMap, ok := c.payload["response_body"].(map[string]interface{}); ok {
|
|
|
|
|
if resultMap, ok := rbMap["result"].(map[string]interface{}); ok {
|
|
|
|
|
if reply, ok := resultMap["reply"].(string); ok && reply == "hello" {
|
|
|
|
|
sawResponseBodyContent = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if sawA2AResponse && sawResponseBodyContent {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
|
}
|
|
|
|
|
if !sawA2AResponse {
|
|
|
|
|
t.Fatalf("expected A2A_RESPONSE broadcast for ws-e2e with message_id=msg-e2e-001 within 2s; recorded: %+v", rec.calls)
|
|
|
|
|
}
|
|
|
|
|
if !sawResponseBodyContent {
|
|
|
|
|
t.Fatalf("expected A2A_RESPONSE payload to carry the agent's actual reply content (`reply:\"hello\"`) so the canvas can render it; recorded: %+v", rec.calls)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE
|
|
|
|
|
// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true,
|
|
|
|
|