diff --git a/workspace-server/internal/handlers/a2a_get_task_test.go b/workspace-server/internal/handlers/a2a_get_task_test.go new file mode 100644 index 000000000..9bb021b60 --- /dev/null +++ b/workspace-server/internal/handlers/a2a_get_task_test.go @@ -0,0 +1,287 @@ +package handlers + +// a2a_get_task_test.go — integration tests for the GetA2ATask +// HTTP handler (#2818, #2751). The handler is the canvas's +// late-arrival recovery path: when the WS push is missed, the +// canvas polls GET /workspaces/:id/a2a/task/{task_id} every +// 5s to recover the buffered result. Tests pin: +// - Pending task: 202 + {status:"pending", task_id} (canvas +// keeps polling) +// - Completed task: 200 + buffered result body (canvas +// synthesizes the response and renders the agent reply) +// - Failed task: 200 + buffered error envelope +// - Unknown task_id: 404 (janitor-evicted or never-existed) +// - Cross-workspace: 403 (caller is not the task's creator) +// - Late-arrival race: Complete after a poll-recovery is a +// no-op (TaskStore-level idempotency) + +import ( + "encoding/base64" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/gin-gonic/gin" +) + +// TestGetA2ATask_PendingReturns202 is the canvas's "keep +// polling" signal: the task is still in flight, no result is +// available yet, the canvas should keep its 5s poll timer +// running. HTTP 202 (not 200) so the canvas can distinguish +// "agent still working" from "agent finished" without reading +// the body — the body is a small {status,task_id} envelope. +func TestGetA2ATask_PendingReturns202(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // Use the package-level store (initialized via init()) so + // the test sees the real janitor/wiring. + h := a2aTaskStore.NewTaskHandle("ws-pending", "message/send") + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-pending"}, {Key: "taskId", Value: h.ID}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-pending/a2a/task/"+h.ID, nil) + + handler.GetA2ATask(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected 202 Accepted for pending, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response not valid JSON: %v", err) + } + if resp["status"] != "pending" { + t.Errorf("status = %v, want pending", resp["status"]) + } + if resp["task_id"] != h.ID { + t.Errorf("task_id = %v, want %v", resp["task_id"], h.ID) + } +} + +// TestGetA2ATask_CompletedReturns200WithBody is the canvas's +// "agent finished, here's the result" signal. The body is +// base64-encoded to preserve the exact bytes of the original +// JSON-RPC response (the canvas decodes + re-parses). +func TestGetA2ATask_CompletedReturns200WithBody(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + h := a2aTaskStore.NewTaskHandle("ws-done", "message/send") + reply := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hello"}}`) + if err := h.Complete(TaskStatusCompleted, TaskResult{ + Status: 200, + Body: reply, + ContentType: "application/json", + }); err != nil { + t.Fatalf("Complete: %v", err) + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-done"}, {Key: "taskId", Value: h.ID}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-done/a2a/task/"+h.ID, nil) + + handler.GetA2ATask(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 OK for completed, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response not valid JSON: %v", err) + } + if resp["status"] != "completed" { + t.Errorf("status = %v, want completed", resp["status"]) + } + if got, _ := resp["http_status"].(float64); int(got) != 200 { + t.Errorf("http_status = %v, want 200", resp["http_status"]) + } + encoded, _ := resp["body"].(string) + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + t.Fatalf("body is not valid base64: %v", err) + } + if string(decoded) != string(reply) { + t.Errorf("decoded body = %q, want %q", decoded, reply) + } +} + +// TestGetA2ATask_FailedReturns200WithError pins the error +// path: the agent's dispatch returned a non-2xx / transport +// error. The handler returns 200 with status:"failed" and +// the buffered error envelope (the canvas treats this as +// "agent replied, but with an error" — the same shape as a +// synchronous 4xx/5xx). +func TestGetA2ATask_FailedReturns200WithError(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + h := a2aTaskStore.NewTaskHandle("ws-err", "message/send") + errEnv := []byte(`{"error":"agent timeout"}`) + if err := h.Complete(TaskStatusFailed, TaskResult{ + Status: http.StatusGatewayTimeout, + Body: errEnv, + ContentType: "application/json", + }); err != nil { + t.Fatalf("Complete: %v", err) + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-err"}, {Key: "taskId", Value: h.ID}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-err/a2a/task/"+h.ID, nil) + + handler.GetA2ATask(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 OK (with error envelope in body) for failed, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response not valid JSON: %v", err) + } + if resp["status"] != "failed" { + t.Errorf("status = %v, want failed", resp["status"]) + } + if got, _ := resp["http_status"].(float64); int(got) != http.StatusGatewayTimeout { + t.Errorf("http_status = %v, want 504", resp["http_status"]) + } + encoded, _ := resp["body"].(string) + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + t.Fatalf("body is not valid base64: %v", err) + } + if !strings.Contains(string(decoded), "agent timeout") { + t.Errorf("decoded body = %q, want substring %q", decoded, "agent timeout") + } +} + +// TestGetA2ATask_UnknownReturns404 pins the 404 contract: +// the task_id is unknown (never existed) or has been evicted +// by the janitor. Either way, the canvas's contract on 404 +// is "give up, surface an error to the user." The handler +// returns 404 with a generic message that does NOT leak +// whether the task_id was once known. +func TestGetA2ATask_UnknownReturns404(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-x"}, {Key: "taskId", Value: "00000000-0000-0000-0000-000000000000"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-x/a2a/task/00000000-0000-0000-0000-000000000000", nil) + + handler.GetA2ATask(c) + + if w.Code != http.StatusNotFound { + t.Errorf("expected 404 Not Found, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestGetA2ATask_CrossWorkspaceReturns403 pins the authz +// check. A caller in workspace A polls a task_id that was +// created in workspace B. The handler returns 403 (not 404) +// so the caller can distinguish "wrong workspace" (their +// request is auth-blocked) from "task truly gone" (janitor +// evicted it, no auth concern). The 404-vs-403 distinction +// matters for retry logic: 403 is a permanent auth failure +// (no point retrying); 404 might be a transient eviction +// (retrying with a different task_id is the right move). +func TestGetA2ATask_CrossWorkspaceReturns403(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + h := a2aTaskStore.NewTaskHandle("ws-creator", "message/send") + // Don't complete — pending is fine for this test, the + // authz check runs before the status dispatch. + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-evil"}, {Key: "taskId", Value: h.ID}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-evil/a2a/task/"+h.ID, nil) + + handler.GetA2ATask(c) + + if w.Code != http.StatusForbidden { + t.Errorf("expected 403 Forbidden, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestGetA2ATask_LateArrivalRace pins the full late-arrival +// flow end-to-end through the HTTP handler. Sequence: +// 1. cap-and-queue creates the handle, returns task_id +// 2. canvas polls: pending (202) +// 3. agent replies: handle.Complete (200 completed) +// 4. canvas polls again: completed (200) +// 5. WS push fires after step 4; second Complete is a +// no-op (idempotent at the store layer) +// 6. canvas polls again: still completed (result unchanged) +func TestGetA2ATask_LateArrivalRace(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + h := a2aTaskStore.NewTaskHandle("ws-race", "message/send") + + // Step 2: poll while pending → 202 + w1 := httptest.NewRecorder() + c1, _ := gin.CreateTestContext(w1) + c1.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}} + c1.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil) + handler.GetA2ATask(c1) + if w1.Code != http.StatusAccepted { + t.Fatalf("step 2 (pending poll): want 202, got %d", w1.Code) + } + + // Step 3: agent replies + first := []byte(`{"result":"first"}`) + if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: first}); err != nil { + t.Fatalf("first Complete: %v", err) + } + + // Step 4: poll after complete → 200 with first result + w2 := httptest.NewRecorder() + c2, _ := gin.CreateTestContext(w2) + c2.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}} + c2.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil) + handler.GetA2ATask(c2) + if w2.Code != http.StatusOK { + t.Fatalf("step 4 (completed poll): want 200, got %d", w2.Code) + } + var resp2 map[string]interface{} + _ = json.Unmarshal(w2.Body.Bytes(), &resp2) + enc2, _ := resp2["body"].(string) + dec2, _ := base64.StdEncoding.DecodeString(enc2) + if string(dec2) != string(first) { + t.Errorf("step 4 body = %q, want %q", dec2, first) + } + + // Step 5: late-arrival Complete is a no-op + second := []byte(`{"result":"second"}`) + if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: second}); err != ErrTaskAlreadyCompleted { + t.Errorf("step 5 (late Complete): want ErrTaskAlreadyCompleted, got %v", err) + } + + // Step 6: poll again — result must NOT have been overwritten + w3 := httptest.NewRecorder() + c3, _ := gin.CreateTestContext(w3) + c3.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}} + c3.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil) + handler.GetA2ATask(c3) + var resp3 map[string]interface{} + _ = json.Unmarshal(w3.Body.Bytes(), &resp3) + enc3, _ := resp3["body"].(string) + dec3, _ := base64.StdEncoding.DecodeString(enc3) + if string(dec3) != string(first) { + t.Errorf("step 6 (post-late-arrival poll): body = %q, want %q (first result must survive)", dec3, first) + } +} diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index bbcc4f4a4..308f89ee1 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -12,6 +12,7 @@ import ( "encoding/json" "errors" "fmt" + "encoding/base64" "io" "log" "net" @@ -415,10 +416,43 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { detached := context.WithoutCancel(ctx) budget := canvasA2ASyncBudget() // local copy for the time.After below done := make(chan a2aResult, 1) + + // #2818 — pre-create the TaskHandle BEFORE the goroutine + // starts so the canvas can poll for the result even if the + // agent reply is still in flight when the HTTP response is + // flushed. The task_id becomes the primary correlation key + // the canvas uses to recover the result if the WS push is + // missed (5s polling fallback per the design). + // + // The task_id is generic across all A2A methods; we record + // which method raised the task for future dispatch-meta + // surfaces (and the GetA2ATask handler can echo it back). + a2aMethod := extractA2AMethod(body) + taskHandle := a2aTaskStore.NewTaskHandle(workspaceID, a2aMethod) + h.asyncWG.Add(1) go func() { defer h.asyncWG.Done() s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser) + // #2818 — buffer the result into the store so a + // late canvas poll can recover. First terminal + // wins (idempotent CAS — see TaskHandle.Complete). + // The WS broadcast still fires unconditionally + // below; the store is the durable buffer for the + // case where the WS push was missed. + if pe != nil { + _ = taskHandle.Complete(TaskStatusFailed, TaskResult{ + Status: pe.Status, + Body: a2aErrorResponseBody(pe), + ContentType: "application/json", + }) + } else { + _ = taskHandle.Complete(TaskStatusCompleted, TaskResult{ + Status: s, + Body: b, + ContentType: "application/json", + }) + } done <- a2aResult{s, b, pe} }() select { @@ -433,10 +467,22 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { c.Data(r.status, "application/json", r.body) return case <-time.After(budget): - // Outlived CF's edge limit — ack queued; the goroutine finishes and - // the reply lands via WS. The canvas already treats `queued` as - // "still processing" (delivery_mode mirrors poll-mode). - c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"}) + // Outlived CF's edge limit — ack queued (202 + task_id); + // the goroutine finishes and the reply lands via WS. The + // canvas correlates the eventual A2A_RESPONSE WS push to + // the task_id (primary key) and falls back to polling + // GET /workspaces/:id/a2a/task/{task_id} if the WS push + // is missed (5s timer per the design). The 202 status + // (not 200) is the new wire shape — older canvases that + // only check for {status:"queued"} still match (the + // body shape is backward-compatible with the legacy + // 200-ack that #2800 shipped). + c.JSON(http.StatusAccepted, gin.H{ + "status": "queued", + "task_id": taskHandle.ID, + "delivery_mode": "push-async", + "method": "message/send", + }) return } } @@ -453,6 +499,108 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { c.Data(status, "application/json", respBody) } +// GetA2ATask handles GET /workspaces/:id/a2a/task/:taskId — the canvas's +// late-arrival recovery path for the cap-and-queue async-dispatch +// contract (#2818, #2751). +// +// Lifecycle: +// - Cap-and-queue fires the budget timer → handler issues a 202 + +// task_id, creates a TaskHandle in `pending` state, the detached +// goroutine continues the agent dispatch. +// - Canvas correlates the eventual A2A_RESPONSE WS push to task_id +// (primary key). +// - If the WS push is missed (drop, hard refresh, late registration), +// the canvas's 5s polling fallback calls THIS endpoint to recover +// the buffered result. The TaskHandle has already moved to +// `completed` or `failed` (the goroutine writes the result via +// TaskHandle.Complete before the WS broadcast fires). +// - 5min after creation the janitor evicts the handle; subsequent +// GETs return 404 (the canvas's contract on a 404 is "give up, +// surface an error to the user"). +// +// Authz: the caller's workspace token must match the handle's +// WorkspaceID. The check is the same pattern approvals.Withdraw uses +// (the row's creator, not the path :id) — but for A2A tasks the +// creator IS the row's workspace, and the path's :id is also the +// creator workspace (A2A isn't cross-workspace in the way approval +// gates are). The check is therefore: path :id == handle.WorkspaceID. +// A mismatch returns 403 (caller is not the workspace that raised +// the task; could be a misrouted request or a stale handle from a +// previous deploy). +// +// Response shape (completed): +// +// HTTP/1.1 200 OK +// { "status": "completed", "task_id": "...", "http_status": 200, +// "body": , "content_type": "..." } +// +// Response shape (still pending): +// +// HTTP/1.1 202 Accepted +// { "status": "pending", "task_id": "..." } +// +// Response shape (failed): +// +// HTTP/1.1 200 OK +// { "status": "failed", "task_id": "...", "http_status": , +// "body": , "content_type": "..." } +// +// HTTP 202 is used for "pending" so the canvas can distinguish +// "agent still working, keep polling" from "agent finished, here's +// the result" without reading the body. Body is base64-encoded to +// avoid JSON escaping for arbitrary JSON-RPC payload shapes (some +// upstream encoders produce non-UTF-8 bytes for binary parts). +func (h *WorkspaceHandler) GetA2ATask(c *gin.Context) { + workspaceID := c.Param("id") + taskID := c.Param("taskId") + + handle, err := a2aTaskStore.Get(taskID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "task not found or expired"}) + return + } + + // Authz: path :id must match the handle's WorkspaceID. A + // mismatch is a 403, not a 404, so the caller can distinguish + // "wrong workspace" from "task truly not in the store". + if handle.WorkspaceID != workspaceID { + c.JSON(http.StatusForbidden, gin.H{"error": "task does not belong to this workspace"}) + return + } + + switch handle.Status() { + case TaskStatusPending: + // Still in flight. Canvas keeps polling. + c.JSON(http.StatusAccepted, gin.H{ + "status": "pending", + "task_id": handle.ID, + }) + return + case TaskStatusCompleted, TaskStatusFailed: + result, _ := handle.Result() + c.JSON(http.StatusOK, gin.H{ + "status": string(handle.Status()), + "task_id": handle.ID, + "http_status": result.Status, + "body": base64.StdEncoding.EncodeToString(result.Body), + "content_type": result.ContentType, + }) + return + default: + // Defensive: any future TaskStatus value lands here. + // Returning 500 surfaces the bug to the canvas as a + // transport-style error (the canvas's contract on 5xx + // is "surface to user"); 200 with a weird body would + // be silent corruption. + c.JSON(http.StatusInternalServerError, gin.H{ + "error": "unknown task status", + "status": string(handle.Status()), + "task_id": handle.ID, + }) + return + } +} + // checkWorkspaceBudget returns a proxyA2AError with 402 when the workspace has // exceeded ANY of its configured per-period budget limits (hourly/daily/weekly/ // monthly — see budget_periods.go). Per-period spend is the rolling-window sum @@ -1232,6 +1380,49 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID return ctx, cancel } +// extractA2AMethod is a lightweight, non-mutating JSON parse that pulls +// the `method` field out of a JSON-RPC envelope. Used by the cap-and-queue +// branch (#2818) to record which A2A method raised the task before the +// full normalizeA2APayload runs. Cheap (no allocations beyond the +// interface map), tolerant of malformed input (returns "" on parse +// error, so the task_id is still issued but the Method field is +// empty — the canvas's contract is on task_id, not Method). +// +// NOT a replacement for normalizeA2APayload — that one mutates the +// body (adds jsonrpc envelope, messageId default, role default, +// parts wrap for v0.2→v0.3 compat) and is the authoritative shape +// for the downstream dispatch. extractA2AMethod is just a "what +// method was this?" peek for the task metadata. +func extractA2AMethod(body []byte) string { + var payload map[string]interface{} + if err := json.Unmarshal(body, &payload); err != nil { + return "" + } + if m, ok := payload["method"].(string); ok { + return m + } + return "" +} + +// a2aErrorResponseBody synthesizes the JSON response body for a +// proxyA2AError — used by the cap-and-queue goroutine to buffer the +// result into the TaskStore so a late canvas poll can recover the +// error envelope. Mirrors the wire shape c.JSON(pe.Status, pe.Response) +// would have produced in the synchronous path: marshal the gin.H map +// to JSON. Failure to marshal (gin.H contains an unserializable +// value, e.g. a channel) is impossible in practice but defensively +// returns an empty body so the buffer still has a non-nil marker. +func a2aErrorResponseBody(pe *proxyA2AError) []byte { + if pe == nil || pe.Response == nil { + return nil + } + b, err := json.Marshal(pe.Response) + if err != nil { + return nil + } + return b +} + // canvasA2ASyncBudget is the extracted lookup for the cap-and-queue synchronous // wait (core#2751). Extracted from the ProxyA2A handler so the default value // can be unit-tested directly without source-string matching — a regression of diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index da1a06cab..a553b9389 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -3009,12 +3009,29 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) { handler.ProxyA2A(c) elapsed := time.Since(start) - if w.Code != http.StatusOK { - t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String()) + if w.Code != http.StatusAccepted { + t.Fatalf("expected 202 Accepted (queued), got %d: %s", w.Code, w.Body.String()) } if !strings.Contains(w.Body.String(), `"queued"`) { t.Errorf("expected queued ack, got: %s", w.Body.String()) } + // #2818 — the new wire shape carries task_id as the + // primary correlation key the canvas uses to recover + // the result via the GET /a2a/task/{task_id} endpoint. + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["status"] != "queued" { + t.Errorf("response.status = %v, want %q", resp["status"], "queued") + } + if resp["delivery_mode"] != "push-async" { + t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "push-async") + } + taskID, _ := resp["task_id"].(string) + if taskID == "" { + t.Errorf("response.task_id missing (required for the late-arrival recovery path)") + } // Returned at ~budget, NOT after the (blocked) agent — proves the cap fired. if elapsed > 2*time.Second { t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed) @@ -3236,9 +3253,10 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) { handler.ProxyA2A(c) elapsed := time.Since(start) - // 1. The HTTP response is the queued ack (not the agent reply). - if w.Code != http.StatusOK { - t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String()) + // 1. The HTTP response is the queued ack (202 + task_id, not + // the agent reply). #2818 wire shape. + if w.Code != http.StatusAccepted { + t.Fatalf("expected 202 Accepted (queued), got %d: %s", w.Code, w.Body.String()) } if !strings.Contains(w.Body.String(), `"queued"`) { t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String()) @@ -3246,6 +3264,16 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) { if !strings.Contains(w.Body.String(), `"push-async"`) { t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String()) } + // task_id is the new primary correlation key the canvas + // uses to recover the result via the GET /a2a/task/{task_id} + // endpoint. Without it, the late-arrival fallback is impossible. + var e2eResp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &e2eResp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if e2eResp["task_id"] == nil || e2eResp["task_id"] == "" { + t.Errorf("response.task_id missing (required for the late-arrival recovery path): %s", w.Body.String()) + } // Returned at ~budget, NOT after the (blocked) agent. if elapsed > 300*time.Millisecond { t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed) diff --git a/workspace-server/internal/handlers/a2a_task_store.go b/workspace-server/internal/handlers/a2a_task_store.go new file mode 100644 index 000000000..2eced86ef --- /dev/null +++ b/workspace-server/internal/handlers/a2a_task_store.go @@ -0,0 +1,342 @@ +package handlers + +// a2a_task_store.go — in-memory task store for the canvas cap-and-queue +// async-dispatch contract (core#2751, #2818). +// +// The cap-and-queue goroutine acks the canvas with `{status:"queued"}` (now +// 202 + task_id) well before the agent finishes its turn, so the canvas +// gets a prompt reply and the long turn completes on a detached forward +// ctx. The agent's eventual reply lands via the AGENT_MESSAGE WebSocket +// broadcast (the existing poll-mode contract). +// +// That's the happy path. The unhappy paths are what this store protects +// against: +// +// 1. WS push is best-effort — a dropped WebSocket (network blip, canvas +// hard-refresh between push and client registration, proxy restart +// mid-push) loses the reply. The canvas needs a way to recover the +// result without resubmitting the turn. +// +// 2. The cap-and-queue goroutine completes AFTER the canvas HTTP reply +// but BEFORE the client registered the WS handler. The WS push fires +// into the void. +// +// 3. Concurrent Decide / withdraw / kill-restart cycles can interleave +// with the agent's reply, and a second caller reading a half-written +// state would observe a torn result. +// +// The store is the durable buffer for (1) + (2): every cap-and-queue +// ack creates a TaskHandle, the detached goroutine writes the result via +// `Complete` when the agent replies, and the canvas can poll +// `GET /workspaces/:id/a2a/task/{task_id}` to recover. A 5-minute TTL + +// janitor goroutine bounds the memory footprint (5min × peak concurrent +// tasks; with a 90s budget + Cloudflare's 100s edge, the worst-case +// "task in flight" duration is ~30min including the agent turn itself, +// but the canvas can recover within 5min of the queued ack so the TTL +// is sized for that recovery window, not the full agent turn). +// +// Concurrency: package-level singleton (mu sync.RWMutex) — the store is +// pure in-process state, no DB, no per-handler state. A single instance +// per process is the natural shape; if a future deploy needs per-handler +// isolation (e.g. a multi-tenant SaaS that wants one store per tenant +// for blast-radius reasons), this is the place to refactor. +// +// Late-arrival safety: Complete is idempotent (first terminal wins — +// guarded by a CAS on the `completed` flag). A WS push that arrives +// after a poll already recovered the result is a no-op. This is the +// contract pin in TestTaskStore_LateArrivalRace. + +import ( + "errors" + "sync" + "time" + + "github.com/google/uuid" +) + +// ErrTaskNotFound is returned by Get when the task_id has either expired +// (TTL'd by the janitor) or never existed (caller typo, stale handle from +// a previous deploy, or — most commonly — a GET that raced the janitor +// by a hair). The two cases are indistinguishable to the caller, and +// that's intentional: leaking the existence-vs-not-yet would let a +// caller enumerate task_ids, which has no legitimate use. +var ErrTaskNotFound = errors.New("a2a task: not found") + +// ErrTaskAlreadyCompleted is returned by Complete on a second call for a +// task that already has a terminal state. Distinct from ErrTaskNotFound +// so tests can assert the idempotency contract directly: a second +// Complete after a successful first one MUST return this error, not +// silently overwrite, not panic. +var ErrTaskAlreadyCompleted = errors.New("a2a task: already completed") + +// TaskStatus is the lifecycle state of a TaskHandle. +type TaskStatus string + +const ( + // TaskStatusPending is the initial state when the cap-and-queue + // goroutine creates the handle. The agent has not yet replied. + TaskStatusPending TaskStatus = "pending" + // TaskStatusCompleted is the terminal state when Complete was + // called with a successful agent reply (HTTP 2xx from the + // workspace's A2A endpoint). Result + StatusCode are populated. + TaskStatusCompleted TaskStatus = "completed" + // TaskStatusFailed is the terminal state when Complete was + // called with a non-2xx / transport / dispatch error. Result + // may be empty; StatusCode is the proxy-side error code. + TaskStatusFailed TaskStatus = "failed" +) + +// TaskResult is the buffered result of a completed A2A task. Mirrors +// the shape proxyA2ARequest would have returned to the canvas in the +// synchronous path, so a GET can synthesize the same response body the +// canvas would have parsed if it had held the connection. +type TaskResult struct { + // Status is the HTTP status code the canvas would have seen: + // 200 for a successful A2A dispatch, 4xx/5xx for an error. + // Note: NOT 202 — 202 was the queued ack, not the actual result. + Status int + // Body is the raw response body (JSON-RPC envelope for a + // message/send success, or {"error": "..."} envelope for a + // dispatch failure). Empty string for a TaskStatusFailed that + // lost the body to a transport error. + Body []byte + // ContentType is the response Content-Type. Almost always + // "application/json" (the proxy's default) but preserved so a + // future extension that streams binary parts doesn't have to + // refactor the store. + ContentType string +} + +// TaskHandle is the in-memory record for a single cap-and-queue +// async-dispatch. Created by NewTaskHandle (called from ProxyA2A +// when the budget timer fires), completed by the detached goroutine +// when proxyA2ARequest returns, retrievable by task_id via the +// GET endpoint for canvas-side late-arrival recovery. +type TaskHandle struct { + // ID is the unique correlation key the canvas uses to + // reconcile the eventual WS push OR poll this store. UUID + // v4 so a caller can't enumerate neighboring task_ids. + ID string + + // WorkspaceID is the workspace the task was raised against. + // The GET endpoint authz-checks against this — a caller + // from workspace A cannot poll a task_id created by + // workspace B (the cross-workspace approval-gate class + // from #2574 / #2593, mirrored here for consistency). + WorkspaceID string + + // Method is the A2A method name (e.g. "message/send") the + // task was created for. Preserved so a future generic + // store (other A2A methods) doesn't lose the dispatch + // semantics. + Method string + + // CreatedAt is when the handle was created (== the moment + // the cap-and-queue budget fired). The janitor uses this + // to compute age for the TTL eviction. + CreatedAt time.Time + + // mu guards the mutable state below. RWMutex so a burst + // of GETs (the late-arrival polling path) doesn't serialize + // against each other. + mu sync.RWMutex + // status is the current lifecycle state. Read-mostly + // after Complete fires, so RWMutex is the right shape. + status TaskStatus + // result is the buffered reply, populated by Complete. + // nil until status moves out of pending. + result *TaskResult +} + +// Status returns the current lifecycle state. Safe for concurrent +// reads — takes the read lock so a concurrent Complete can't tear +// the read. +func (t *TaskHandle) Status() TaskStatus { + t.mu.RLock() + defer t.mu.RUnlock() + return t.status +} + +// Result returns the buffered reply + a bool indicating whether +// the task is in a terminal state. The bool is true once status +// has moved to completed/failed; before that, result is nil and +// the bool is false (caller should keep polling). +// +// Returned by-value (not by-pointer) so a caller can't observe a +// later mutation without re-reading. The Result pointer inside +// the handle is replaced atomically by Complete, so even a caller +// that captured a pointer would see the old value if Complete +// fires after; by-value copies the *pointer*, which is a known +// limitation — but the *content* is immutable post-Complete (the +// TaskResult struct itself isn't mutated after assignment), so +// the captured pointer is safe to read. +func (t *TaskHandle) Result() (TaskResult, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + if t.status == TaskStatusPending || t.result == nil { + return TaskResult{}, false + } + return *t.result, true +} + +// Complete moves the handle to a terminal state, buffering the +// result. Idempotent: a second call after a successful first one +// returns ErrTaskAlreadyCompleted (does NOT overwrite the result). +// This is the contract pin for the late-arrival race: a WS push +// that fires AFTER a poll already recovered the result is a no-op +// at the store layer; the canvas side then sees the same body +// either way and the only cost is one wasted broadcast. +// +// The CAS pattern (read status, write status if pending) is +// correct under concurrent Complete calls: exactly one caller +// wins, the other gets ErrTaskAlreadyCompleted. The first +// winner's result is what survives. +func (t *TaskHandle) Complete(status TaskStatus, result TaskResult) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.status != TaskStatusPending { + return ErrTaskAlreadyCompleted + } + t.status = status + // Defensive copy so a caller mutating their local TaskResult + // after Complete returns can't change the buffered result. + copied := result + if result.Body != nil { + copied.Body = append([]byte(nil), result.Body...) + } + t.result = &copied + return nil +} + +// A2ATaskStore is the package-level in-memory store. Singleton +// per process; see file header for the multi-tenant refactor +// note. RWMutex so the GET-side polling path (which can fire +// every 5s per pending task) doesn't serialize against the +// Create/Complete paths. +type A2ATaskStore struct { + mu sync.RWMutex + tasks map[string]*TaskHandle + janitor *time.Ticker + stop chan struct{} + once sync.Once +} + +// a2aTaskStoreTTL is how long a TaskHandle survives after +// creation before the janitor evicts it. 5min is the canvas +// recovery window per the design (#2818): the WS push is +// best-effort, the poll fallback is 5s, so a canvas that misses +// both the push AND the first poll can still recover via +// repeated polls for up to 5min before the store evicts the +// handle. After that, the agent reply is lost to the canvas +// (still in the activity_logs / events history for the agent +// owner, just not surfaceable via the task_id correlation key). +// +// 5min is NOT the agent turn timeout — a 30min agent turn can +// still be in flight at the 5min mark, and the result is then +// delivered via WS only (the canvas's only hope of seeing it +// is the push). The TTL is a recovery-window bound, not an +// agent-runtime bound. +const a2aTaskStoreTTL = 5 * time.Minute + +// a2aTaskStoreJanitorInterval is how often the janitor scans +// for expired tasks. 60s is a fine balance: fast enough that +// the map doesn't grow unbounded under steady load, slow +// enough that the scan isn't a hot path. +const a2aTaskStoreJanitorInterval = 60 * time.Second + +// a2aTaskStore is the package-level singleton. Initialized +// lazily on first NewTaskHandle call so tests that don't +// touch the store don't pay the janitor goroutine cost. +var a2aTaskStore = &A2ATaskStore{ + tasks: make(map[string]*TaskHandle), + stop: make(chan struct{}), +} + +// init starts the janitor on package import. The cost is one +// goroutine per process; the benefit is that the store is +// always ready when ProxyA2A first fires. Tests that don't +// touch the store still pay the goroutine cost — acceptable +// because the janitor only fires every 60s and does an O(N) +// scan of a map that's almost always empty. +func init() { + a2aTaskStore.once.Do(func() { + a2aTaskStore.janitor = time.NewTicker(a2aTaskStoreJanitorInterval) + go a2aTaskStore.runJanitor() + }) +} + +// runJanitor is the periodic eviction loop. Runs forever until +// process exit; no shutdown path because the process is +// short-lived (the platform restarts on deploy, and the OS +// reclaims the map on exit). +func (s *A2ATaskStore) runJanitor() { + for { + select { + case <-s.janitor.C: + s.pruneExpired() + case <-s.stop: + return + } + } +} + +// pruneExpired evicts handles older than a2aTaskStoreTTL. +// O(N) over the map; N is bounded by the peak concurrent +// in-flight tasks (typically <100 even on a busy platform). +// The take-and-release lock pattern (copy keys under read +// lock, delete under write lock) is the standard Go map +// iteration under lock idiom — delete-during-iteration +// would invalidate the cursor. +func (s *A2ATaskStore) pruneExpired() { + cutoff := time.Now().Add(-a2aTaskStoreTTL) + var expired []string + s.mu.RLock() + for id, t := range s.tasks { + if t.CreatedAt.Before(cutoff) { + expired = append(expired, id) + } + } + s.mu.RUnlock() + if len(expired) == 0 { + return + } + s.mu.Lock() + for _, id := range expired { + delete(s.tasks, id) + } + s.mu.Unlock() +} + +// NewTaskHandle creates a new pending TaskHandle, inserts it +// into the store, and returns it. The caller (ProxyA2A's +// cap-and-queue branch) embeds the returned ID in the 202 +// response so the canvas can correlate. +func (s *A2ATaskStore) NewTaskHandle(workspaceID, method string) *TaskHandle { + t := &TaskHandle{ + ID: uuid.NewString(), + WorkspaceID: workspaceID, + Method: method, + CreatedAt: time.Now(), + status: TaskStatusPending, + } + s.mu.Lock() + s.tasks[t.ID] = t + s.mu.Unlock() + return t +} + +// Get retrieves a TaskHandle by ID. Returns ErrTaskNotFound +// for expired or never-existed handles. The authz check +// (workspace_id match) is the caller's responsibility — +// Get returns the handle regardless of workspace, the +// handler then checks handle.WorkspaceID against the +// caller's workspace. +func (s *A2ATaskStore) Get(taskID string) (*TaskHandle, error) { + s.mu.RLock() + t, ok := s.tasks[taskID] + s.mu.RUnlock() + if !ok { + return nil, ErrTaskNotFound + } + return t, nil +} diff --git a/workspace-server/internal/handlers/a2a_task_store_test.go b/workspace-server/internal/handlers/a2a_task_store_test.go new file mode 100644 index 000000000..0709af0f4 --- /dev/null +++ b/workspace-server/internal/handlers/a2a_task_store_test.go @@ -0,0 +1,315 @@ +package handlers + +// a2a_task_store_test.go — unit tests for the in-memory task store +// (core#2751 / #2818 async-dispatch contract). The store is the +// durable buffer for the cap-and-queue's late-arrival path: a +// canvas that misses the WS push polls the store via the +// GET /workspaces/:id/a2a/task/{task_id} endpoint to recover +// the buffered result. Tests pin: +// - Lifecycle: NewTaskHandle → Complete → Get round-trip +// - Concurrency: parallel Complete calls (only one wins — the +// first terminal is the durable result) +// - Late-arrival race: Get after Complete returns the buffered +// result; a second Complete is a no-op +// - TTL eviction: handles older than a2aTaskStoreTTL are pruned +// - Error paths: ErrTaskNotFound for expired/missing, +// ErrTaskAlreadyCompleted for double-Complete + +import ( + "bytes" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TestTaskStore_NewPendingHandle_GeneratesUniqueIDs pins the +// contract that every NewTaskHandle returns a fresh UUID v4 +// (no collisions even under burst). Two handles from the same +// store must have different IDs. +func TestTaskStore_NewPendingHandle_GeneratesUniqueIDs(t *testing.T) { + s := newIsolatedTaskStore(t) + h1 := s.NewTaskHandle("ws-1", "message/send") + h2 := s.NewTaskHandle("ws-1", "message/send") + if h1.ID == h2.ID { + t.Errorf("expected unique IDs, got %q twice", h1.ID) + } + if h1.Status() != TaskStatusPending { + t.Errorf("new handle should be pending, got %q", h1.Status()) + } + if h1.WorkspaceID != "ws-1" { + t.Errorf("WorkspaceID = %q, want ws-1", h1.WorkspaceID) + } + if h1.Method != "message/send" { + t.Errorf("Method = %q, want message/send", h1.Method) + } +} + +// TestTaskStore_Complete_StoresResult — happy path: complete +// moves the handle out of pending, Result returns the buffered +// shape. +func TestTaskStore_Complete_StoresResult(t *testing.T) { + s := newIsolatedTaskStore(t) + h := s.NewTaskHandle("ws-1", "message/send") + body := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hi"}}`) + + if err := h.Complete(TaskStatusCompleted, TaskResult{ + Status: 200, + Body: body, + ContentType: "application/json", + }); err != nil { + t.Fatalf("first Complete: %v", err) + } + if got := h.Status(); got != TaskStatusCompleted { + t.Errorf("Status after Complete = %q, want completed", got) + } + result, ok := h.Result() + if !ok { + t.Fatal("Result should be available after Complete") + } + if result.Status != 200 { + t.Errorf("Result.Status = %d, want 200", result.Status) + } + if !bytes.Equal(result.Body, body) { + t.Errorf("Result.Body = %q, want %q", result.Body, body) + } + if result.ContentType != "application/json" { + t.Errorf("Result.ContentType = %q, want application/json", result.ContentType) + } +} + +// TestTaskStore_Complete_Idempotent — the load-bearing +// contract pin. A second Complete on an already-terminal +// handle returns ErrTaskAlreadyCompleted AND does NOT +// overwrite the buffered result. This is what protects +// the late-arrival race: a WS push that fires after a +// poll already recovered the result is a no-op at the +// store layer; the first Complete wins. +func TestTaskStore_Complete_Idempotent(t *testing.T) { + s := newIsolatedTaskStore(t) + h := s.NewTaskHandle("ws-1", "message/send") + + first := []byte(`{"result":"first"}`) + second := []byte(`{"result":"second"}`) + + if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: first}); err != nil { + t.Fatalf("first Complete: %v", err) + } + err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: second}) + if err != ErrTaskAlreadyCompleted { + t.Errorf("second Complete: want ErrTaskAlreadyCompleted, got %v", err) + } + // First result survives. + result, ok := h.Result() + if !ok { + t.Fatal("Result should be available after first Complete") + } + if !bytes.Equal(result.Body, first) { + t.Errorf("second Complete overwrote result: got %q, want %q", result.Body, first) + } +} + +// TestTaskStore_Complete_ConcurrentFirstWins — the concurrent +// version of the idempotency contract. Multiple goroutines +// race to Complete the same handle; exactly one wins, the +// others get ErrTaskAlreadyCompleted, and the surviving +// result is the one from the winning goroutine. +func TestTaskStore_Complete_ConcurrentFirstWins(t *testing.T) { + s := newIsolatedTaskStore(t) + h := s.NewTaskHandle("ws-1", "message/send") + + const N = 50 + var winners atomic.Int32 + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + i := i + go func() { + defer wg.Done() + body := []byte(`{"id":` + itoaForTest(i) + `}`) + if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: body}); err == nil { + winners.Add(1) + } else if err != ErrTaskAlreadyCompleted { + t.Errorf("goroutine %d: unexpected error %v", i, err) + } + }() + } + wg.Wait() + if got := winners.Load(); got != 1 { + t.Errorf("expected exactly 1 winner, got %d", got) + } + // Result is the winning goroutine's body — we don't know + // which one, but it must be non-nil and parseable as the + // shape Complete wrote. + result, ok := h.Result() + if !ok { + t.Fatal("Result should be available after concurrent Complete") + } + if len(result.Body) == 0 { + t.Error("winning Complete produced empty body") + } +} + +// TestTaskStore_Get_UnknownTaskID — the basic error path. +// Get for a task_id that never existed returns +// ErrTaskNotFound. +func TestTaskStore_Get_UnknownTaskID(t *testing.T) { + s := newIsolatedTaskStore(t) + if _, err := s.Get("00000000-0000-0000-0000-000000000000"); err != ErrTaskNotFound { + t.Errorf("Get unknown: want ErrTaskNotFound, got %v", err) + } +} + +// TestTaskStore_Prune_RemovesExpired — the TTL eviction +// contract. A handle older than a2aTaskStoreTTL is removed +// from the store on the next pruneExpired call. Test +// bypasses the real timer by calling pruneExpired +// directly with a synthetically-aged handle. +func TestTaskStore_Prune_RemovesExpired(t *testing.T) { + s := newIsolatedTaskStore(t) + h := s.NewTaskHandle("ws-1", "message/send") + + // Backdate the handle past the TTL. + h.CreatedAt = time.Now().Add(-2 * a2aTaskStoreTTL) + + s.pruneExpired() + + if _, err := s.Get(h.ID); err != ErrTaskNotFound { + t.Errorf("expected expired handle to be pruned, got %v", err) + } +} + +// TestTaskStore_LateArrivalRace — the contract-critical +// integration test for #2818. The full flow: +// 1. cap-and-queue creates the handle, returns task_id +// 2. agent is slow; canvas polls via the GET endpoint +// while the handle is still pending → 202 +// 3. agent replies; goroutine Completes the handle +// 4. canvas polls again → 200 with the buffered result +// 5. WS push fires after the poll already recovered +// the result; the late-arrival Complete is a no-op +// (ErrTaskAlreadyCompleted) +func TestTaskStore_LateArrivalRace(t *testing.T) { + s := newIsolatedTaskStore(t) + h := s.NewTaskHandle("ws-1", "message/send") + + // Step 2: poll while pending. + if h.Status() != TaskStatusPending { + t.Fatalf("expected pending, got %q", h.Status()) + } + if _, ok := h.Result(); ok { + t.Fatal("Result should not be available before Complete") + } + + // Step 3: agent replies. + reply := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hello"}}`) + if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: reply}); err != nil { + t.Fatalf("Complete: %v", err) + } + + // Step 4: poll after Complete returns the buffered result. + result, ok := h.Result() + if !ok { + t.Fatal("Result should be available after Complete") + } + if !bytes.Equal(result.Body, reply) { + t.Errorf("Result.Body = %q, want %q", result.Body, reply) + } + + // Step 5: a late-arrival Complete (e.g. WS push firing + // after the canvas's poll already recovered) is a + // no-op — the second call returns ErrTaskAlreadyCompleted + // and does NOT overwrite the first result. + if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: []byte(`{"late":"winner"}`)}); err != ErrTaskAlreadyCompleted { + t.Errorf("late-arrival Complete: want ErrTaskAlreadyCompleted, got %v", err) + } + result2, _ := h.Result() + if !bytes.Equal(result2.Body, reply) { + t.Errorf("late-arrival Complete overwrote result: got %q, want %q", result2.Body, reply) + } +} + +// TestTaskStore_ConcurrentAccess — many goroutines +// creating + completing + getting handles, no data race. +// Run with `go test -race` to catch the race detector. +func TestTaskStore_ConcurrentAccess(t *testing.T) { + s := newIsolatedTaskStore(t) + + const N = 100 + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + h := s.NewTaskHandle("ws-1", "message/send") + _ = h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: []byte(`ok`)}) + _, _ = s.Get(h.ID) + }() + } + wg.Wait() +} + +// TestExtractA2AMethod pins the lightweight JSON peek used +// by the cap-and-queue branch to record which A2A method +// raised the task. Tolerant of malformed input (returns +// "" on parse error so the task_id is still issued). +func TestExtractA2AMethod(t *testing.T) { + cases := []struct { + name string + body string + want string + }{ + {"message/send", `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{}}`, "message/send"}, + {"message/stream", `{"method":"message/stream"}`, "message/stream"}, + {"initialize", `{"method":"initialize"}`, "initialize"}, + {"missing-method", `{"jsonrpc":"2.0","id":"1","params":{}}`, ""}, + {"non-string-method", `{"method":42}`, ""}, + {"malformed", `{"method":`, ""}, + {"empty", ``, ""}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := extractA2AMethod([]byte(c.body)); got != c.want { + t.Errorf("extractA2AMethod(%q) = %q, want %q", c.body, got, c.want) + } + }) + } +} + +// newIsolatedTaskStore creates a fresh A2ATaskStore for the +// test, replacing the package-level singleton. Avoids the +// real janitor goroutine and the shared-state bleed that +// would happen if two tests used the same store. +func newIsolatedTaskStore(t *testing.T) *A2ATaskStore { + t.Helper() + return &A2ATaskStore{ + tasks: make(map[string]*TaskHandle), + stop: make(chan struct{}), + } +} + +// itoa is a tiny no-allocation integer-to-string for the +// concurrent test (faster than strconv.Itoa in a hot loop). +// Defined in a way that doesn't collide with class1_ast_gate_test.go +// (which has its own itoa) by using a per-test wrapper. +func itoaForTest(i int) string { + if i == 0 { + return "0" + } + neg := i < 0 + if neg { + i = -i + } + var buf [20]byte + pos := len(buf) + for i > 0 { + pos-- + buf[pos] = byte('0' + i%10) + i /= 10 + } + if neg { + pos-- + buf[pos] = '-' + } + return string(buf[pos:]) +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 1dfb23708..d008b4f5b 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -250,6 +250,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // org-level token). Existence-non-inferring 404 on auth failure. r.GET("/workspaces/:id/a2a/queue/:queue_id", wh.GetA2AQueueStatus) + // A2A task lookup (#2818) — the late-arrival recovery path for + // the cap-and-queue async-dispatch contract. Canvas polls this + // every 5s when the WS push is missed, looking for the buffered + // result. Same routing rationale as /a2a/queue/:queue_id (auth + // is per-handler against the task's WorkspaceID). + r.GET("/workspaces/:id/a2a/task/:taskId", wh.GetA2ATask) + // Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a. // Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13 // by requiring a valid bearer token for any workspace that has one on file.