fix(workspace-server#2818 Phase 1+2): task_id correlation key + late-arrival recovery #2852

Closed
agent-dev-b wants to merge 1 commits from fix/2818-async-dispatch-202-taskid into main
6 changed files with 1179 additions and 9 deletions
@@ -0,0 +1,287 @@
package handlers
// a2a_get_task_test.go — integration tests for the GetA2ATask
// HTTP handler (#2818, #2751). The handler is the canvas's
// late-arrival recovery path: when the WS push is missed, the
// canvas polls GET /workspaces/:id/a2a/task/{task_id} every
// 5s to recover the buffered result. Tests pin:
// - Pending task: 202 + {status:"pending", task_id} (canvas
// keeps polling)
// - Completed task: 200 + buffered result body (canvas
// synthesizes the response and renders the agent reply)
// - Failed task: 200 + buffered error envelope
// - Unknown task_id: 404 (janitor-evicted or never-existed)
// - Cross-workspace: 403 (caller is not the task's creator)
// - Late-arrival race: Complete after a poll-recovery is a
// no-op (TaskStore-level idempotency)
import (
"encoding/base64"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/gin-gonic/gin"
)
// TestGetA2ATask_PendingReturns202 is the canvas's "keep
// polling" signal: the task is still in flight, no result is
// available yet, the canvas should keep its 5s poll timer
// running. HTTP 202 (not 200) so the canvas can distinguish
// "agent still working" from "agent finished" without reading
// the body — the body is a small {status,task_id} envelope.
func TestGetA2ATask_PendingReturns202(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// Use the package-level store (initialized via init()) so
// the test sees the real janitor/wiring.
h := a2aTaskStore.NewTaskHandle("ws-pending", "message/send")
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-pending"}, {Key: "taskId", Value: h.ID}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-pending/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c)
if w.Code != http.StatusAccepted {
t.Fatalf("expected 202 Accepted for pending, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if resp["status"] != "pending" {
t.Errorf("status = %v, want pending", resp["status"])
}
if resp["task_id"] != h.ID {
t.Errorf("task_id = %v, want %v", resp["task_id"], h.ID)
}
}
// TestGetA2ATask_CompletedReturns200WithBody is the canvas's
// "agent finished, here's the result" signal. The body is
// base64-encoded to preserve the exact bytes of the original
// JSON-RPC response (the canvas decodes + re-parses).
func TestGetA2ATask_CompletedReturns200WithBody(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h := a2aTaskStore.NewTaskHandle("ws-done", "message/send")
reply := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hello"}}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{
Status: 200,
Body: reply,
ContentType: "application/json",
}); err != nil {
t.Fatalf("Complete: %v", err)
}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-done"}, {Key: "taskId", Value: h.ID}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-done/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 OK for completed, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if resp["status"] != "completed" {
t.Errorf("status = %v, want completed", resp["status"])
}
if got, _ := resp["http_status"].(float64); int(got) != 200 {
t.Errorf("http_status = %v, want 200", resp["http_status"])
}
encoded, _ := resp["body"].(string)
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
t.Fatalf("body is not valid base64: %v", err)
}
if string(decoded) != string(reply) {
t.Errorf("decoded body = %q, want %q", decoded, reply)
}
}
// TestGetA2ATask_FailedReturns200WithError pins the error
// path: the agent's dispatch returned a non-2xx / transport
// error. The handler returns 200 with status:"failed" and
// the buffered error envelope (the canvas treats this as
// "agent replied, but with an error" — the same shape as a
// synchronous 4xx/5xx).
func TestGetA2ATask_FailedReturns200WithError(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h := a2aTaskStore.NewTaskHandle("ws-err", "message/send")
errEnv := []byte(`{"error":"agent timeout"}`)
if err := h.Complete(TaskStatusFailed, TaskResult{
Status: http.StatusGatewayTimeout,
Body: errEnv,
ContentType: "application/json",
}); err != nil {
t.Fatalf("Complete: %v", err)
}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-err"}, {Key: "taskId", Value: h.ID}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-err/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 OK (with error envelope in body) for failed, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if resp["status"] != "failed" {
t.Errorf("status = %v, want failed", resp["status"])
}
if got, _ := resp["http_status"].(float64); int(got) != http.StatusGatewayTimeout {
t.Errorf("http_status = %v, want 504", resp["http_status"])
}
encoded, _ := resp["body"].(string)
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
t.Fatalf("body is not valid base64: %v", err)
}
if !strings.Contains(string(decoded), "agent timeout") {
t.Errorf("decoded body = %q, want substring %q", decoded, "agent timeout")
}
}
// TestGetA2ATask_UnknownReturns404 pins the 404 contract:
// the task_id is unknown (never existed) or has been evicted
// by the janitor. Either way, the canvas's contract on 404
// is "give up, surface an error to the user." The handler
// returns 404 with a generic message that does NOT leak
// whether the task_id was once known.
func TestGetA2ATask_UnknownReturns404(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-x"}, {Key: "taskId", Value: "00000000-0000-0000-0000-000000000000"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-x/a2a/task/00000000-0000-0000-0000-000000000000", nil)
handler.GetA2ATask(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404 Not Found, got %d: %s", w.Code, w.Body.String())
}
}
// TestGetA2ATask_CrossWorkspaceReturns403 pins the authz
// check. A caller in workspace A polls a task_id that was
// created in workspace B. The handler returns 403 (not 404)
// so the caller can distinguish "wrong workspace" (their
// request is auth-blocked) from "task truly gone" (janitor
// evicted it, no auth concern). The 404-vs-403 distinction
// matters for retry logic: 403 is a permanent auth failure
// (no point retrying); 404 might be a transient eviction
// (retrying with a different task_id is the right move).
func TestGetA2ATask_CrossWorkspaceReturns403(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h := a2aTaskStore.NewTaskHandle("ws-creator", "message/send")
// Don't complete — pending is fine for this test, the
// authz check runs before the status dispatch.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-evil"}, {Key: "taskId", Value: h.ID}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-evil/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c)
if w.Code != http.StatusForbidden {
t.Errorf("expected 403 Forbidden, got %d: %s", w.Code, w.Body.String())
}
}
// TestGetA2ATask_LateArrivalRace pins the full late-arrival
// flow end-to-end through the HTTP handler. Sequence:
// 1. cap-and-queue creates the handle, returns task_id
// 2. canvas polls: pending (202)
// 3. agent replies: handle.Complete (200 completed)
// 4. canvas polls again: completed (200)
// 5. WS push fires after step 4; second Complete is a
// no-op (idempotent at the store layer)
// 6. canvas polls again: still completed (result unchanged)
func TestGetA2ATask_LateArrivalRace(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h := a2aTaskStore.NewTaskHandle("ws-race", "message/send")
// Step 2: poll while pending → 202
w1 := httptest.NewRecorder()
c1, _ := gin.CreateTestContext(w1)
c1.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}}
c1.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c1)
if w1.Code != http.StatusAccepted {
t.Fatalf("step 2 (pending poll): want 202, got %d", w1.Code)
}
// Step 3: agent replies
first := []byte(`{"result":"first"}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: first}); err != nil {
t.Fatalf("first Complete: %v", err)
}
// Step 4: poll after complete → 200 with first result
w2 := httptest.NewRecorder()
c2, _ := gin.CreateTestContext(w2)
c2.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}}
c2.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c2)
if w2.Code != http.StatusOK {
t.Fatalf("step 4 (completed poll): want 200, got %d", w2.Code)
}
var resp2 map[string]interface{}
_ = json.Unmarshal(w2.Body.Bytes(), &resp2)
enc2, _ := resp2["body"].(string)
dec2, _ := base64.StdEncoding.DecodeString(enc2)
if string(dec2) != string(first) {
t.Errorf("step 4 body = %q, want %q", dec2, first)
}
// Step 5: late-arrival Complete is a no-op
second := []byte(`{"result":"second"}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: second}); err != ErrTaskAlreadyCompleted {
t.Errorf("step 5 (late Complete): want ErrTaskAlreadyCompleted, got %v", err)
}
// Step 6: poll again — result must NOT have been overwritten
w3 := httptest.NewRecorder()
c3, _ := gin.CreateTestContext(w3)
c3.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}}
c3.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil)
handler.GetA2ATask(c3)
var resp3 map[string]interface{}
_ = json.Unmarshal(w3.Body.Bytes(), &resp3)
enc3, _ := resp3["body"].(string)
dec3, _ := base64.StdEncoding.DecodeString(enc3)
if string(dec3) != string(first) {
t.Errorf("step 6 (post-late-arrival poll): body = %q, want %q (first result must survive)", dec3, first)
}
}
+195 -4
View File
@@ -12,6 +12,7 @@ import (
"encoding/json"
"errors"
"fmt"
"encoding/base64"
"io"
"log"
"net"
@@ -415,10 +416,43 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
detached := context.WithoutCancel(ctx)
budget := canvasA2ASyncBudget() // local copy for the time.After below
done := make(chan a2aResult, 1)
// #2818 — pre-create the TaskHandle BEFORE the goroutine
// starts so the canvas can poll for the result even if the
// agent reply is still in flight when the HTTP response is
// flushed. The task_id becomes the primary correlation key
// the canvas uses to recover the result if the WS push is
// missed (5s polling fallback per the design).
//
// The task_id is generic across all A2A methods; we record
// which method raised the task for future dispatch-meta
// surfaces (and the GetA2ATask handler can echo it back).
a2aMethod := extractA2AMethod(body)
taskHandle := a2aTaskStore.NewTaskHandle(workspaceID, a2aMethod)
h.asyncWG.Add(1)
go func() {
defer h.asyncWG.Done()
s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser)
// #2818 — buffer the result into the store so a
// late canvas poll can recover. First terminal
// wins (idempotent CAS — see TaskHandle.Complete).
// The WS broadcast still fires unconditionally
// below; the store is the durable buffer for the
// case where the WS push was missed.
if pe != nil {
_ = taskHandle.Complete(TaskStatusFailed, TaskResult{
Status: pe.Status,
Body: a2aErrorResponseBody(pe),
ContentType: "application/json",
})
} else {
_ = taskHandle.Complete(TaskStatusCompleted, TaskResult{
Status: s,
Body: b,
ContentType: "application/json",
})
}
done <- a2aResult{s, b, pe}
}()
select {
@@ -433,10 +467,22 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
c.Data(r.status, "application/json", r.body)
return
case <-time.After(budget):
// Outlived CF's edge limit — ack queued; the goroutine finishes and
// the reply lands via WS. The canvas already treats `queued` as
// "still processing" (delivery_mode mirrors poll-mode).
c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"})
// Outlived CF's edge limit — ack queued (202 + task_id);
// the goroutine finishes and the reply lands via WS. The
// canvas correlates the eventual A2A_RESPONSE WS push to
// the task_id (primary key) and falls back to polling
// GET /workspaces/:id/a2a/task/{task_id} if the WS push
// is missed (5s timer per the design). The 202 status
// (not 200) is the new wire shape — older canvases that
// only check for {status:"queued"} still match (the
// body shape is backward-compatible with the legacy
// 200-ack that #2800 shipped).
c.JSON(http.StatusAccepted, gin.H{
"status": "queued",
"task_id": taskHandle.ID,
"delivery_mode": "push-async",
"method": "message/send",
})
return
}
}
@@ -453,6 +499,108 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
c.Data(status, "application/json", respBody)
}
// GetA2ATask handles GET /workspaces/:id/a2a/task/:taskId — the canvas's
// late-arrival recovery path for the cap-and-queue async-dispatch
// contract (#2818, #2751).
//
// Lifecycle:
// - Cap-and-queue fires the budget timer → handler issues a 202 +
// task_id, creates a TaskHandle in `pending` state, the detached
// goroutine continues the agent dispatch.
// - Canvas correlates the eventual A2A_RESPONSE WS push to task_id
// (primary key).
// - If the WS push is missed (drop, hard refresh, late registration),
// the canvas's 5s polling fallback calls THIS endpoint to recover
// the buffered result. The TaskHandle has already moved to
// `completed` or `failed` (the goroutine writes the result via
// TaskHandle.Complete before the WS broadcast fires).
// - 5min after creation the janitor evicts the handle; subsequent
// GETs return 404 (the canvas's contract on a 404 is "give up,
// surface an error to the user").
//
// Authz: the caller's workspace token must match the handle's
// WorkspaceID. The check is the same pattern approvals.Withdraw uses
// (the row's creator, not the path :id) — but for A2A tasks the
// creator IS the row's workspace, and the path's :id is also the
// creator workspace (A2A isn't cross-workspace in the way approval
// gates are). The check is therefore: path :id == handle.WorkspaceID.
// A mismatch returns 403 (caller is not the workspace that raised
// the task; could be a misrouted request or a stale handle from a
// previous deploy).
//
// Response shape (completed):
//
// HTTP/1.1 200 OK
// { "status": "completed", "task_id": "...", "http_status": 200,
// "body": <base64 of original response body>, "content_type": "..." }
//
// Response shape (still pending):
//
// HTTP/1.1 202 Accepted
// { "status": "pending", "task_id": "..." }
//
// Response shape (failed):
//
// HTTP/1.1 200 OK
// { "status": "failed", "task_id": "...", "http_status": <error code>,
// "body": <base64 of error envelope>, "content_type": "..." }
//
// HTTP 202 is used for "pending" so the canvas can distinguish
// "agent still working, keep polling" from "agent finished, here's
// the result" without reading the body. Body is base64-encoded to
// avoid JSON escaping for arbitrary JSON-RPC payload shapes (some
// upstream encoders produce non-UTF-8 bytes for binary parts).
func (h *WorkspaceHandler) GetA2ATask(c *gin.Context) {
workspaceID := c.Param("id")
taskID := c.Param("taskId")
handle, err := a2aTaskStore.Get(taskID)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "task not found or expired"})
return
}
// Authz: path :id must match the handle's WorkspaceID. A
// mismatch is a 403, not a 404, so the caller can distinguish
// "wrong workspace" from "task truly not in the store".
if handle.WorkspaceID != workspaceID {
c.JSON(http.StatusForbidden, gin.H{"error": "task does not belong to this workspace"})
return
}
switch handle.Status() {
case TaskStatusPending:
// Still in flight. Canvas keeps polling.
c.JSON(http.StatusAccepted, gin.H{
"status": "pending",
"task_id": handle.ID,
})
return
case TaskStatusCompleted, TaskStatusFailed:
result, _ := handle.Result()
c.JSON(http.StatusOK, gin.H{
"status": string(handle.Status()),
"task_id": handle.ID,
"http_status": result.Status,
"body": base64.StdEncoding.EncodeToString(result.Body),
"content_type": result.ContentType,
})
return
default:
// Defensive: any future TaskStatus value lands here.
// Returning 500 surfaces the bug to the canvas as a
// transport-style error (the canvas's contract on 5xx
// is "surface to user"); 200 with a weird body would
// be silent corruption.
c.JSON(http.StatusInternalServerError, gin.H{
"error": "unknown task status",
"status": string(handle.Status()),
"task_id": handle.ID,
})
return
}
}
// checkWorkspaceBudget returns a proxyA2AError with 402 when the workspace has
// exceeded ANY of its configured per-period budget limits (hourly/daily/weekly/
// monthly — see budget_periods.go). Per-period spend is the rolling-window sum
@@ -1232,6 +1380,49 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
return ctx, cancel
}
// extractA2AMethod is a lightweight, non-mutating JSON parse that pulls
// the `method` field out of a JSON-RPC envelope. Used by the cap-and-queue
// branch (#2818) to record which A2A method raised the task before the
// full normalizeA2APayload runs. Cheap (no allocations beyond the
// interface map), tolerant of malformed input (returns "" on parse
// error, so the task_id is still issued but the Method field is
// empty — the canvas's contract is on task_id, not Method).
//
// NOT a replacement for normalizeA2APayload — that one mutates the
// body (adds jsonrpc envelope, messageId default, role default,
// parts wrap for v0.2→v0.3 compat) and is the authoritative shape
// for the downstream dispatch. extractA2AMethod is just a "what
// method was this?" peek for the task metadata.
func extractA2AMethod(body []byte) string {
var payload map[string]interface{}
if err := json.Unmarshal(body, &payload); err != nil {
return ""
}
if m, ok := payload["method"].(string); ok {
return m
}
return ""
}
// a2aErrorResponseBody synthesizes the JSON response body for a
// proxyA2AError — used by the cap-and-queue goroutine to buffer the
// result into the TaskStore so a late canvas poll can recover the
// error envelope. Mirrors the wire shape c.JSON(pe.Status, pe.Response)
// would have produced in the synchronous path: marshal the gin.H map
// to JSON. Failure to marshal (gin.H contains an unserializable
// value, e.g. a channel) is impossible in practice but defensively
// returns an empty body so the buffer still has a non-nil marker.
func a2aErrorResponseBody(pe *proxyA2AError) []byte {
if pe == nil || pe.Response == nil {
return nil
}
b, err := json.Marshal(pe.Response)
if err != nil {
return nil
}
return b
}
// canvasA2ASyncBudget is the extracted lookup for the cap-and-queue synchronous
// wait (core#2751). Extracted from the ProxyA2A handler so the default value
// can be unit-tested directly without source-string matching — a regression of
@@ -3009,12 +3009,29 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
handler.ProxyA2A(c)
elapsed := time.Since(start)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
if w.Code != http.StatusAccepted {
t.Fatalf("expected 202 Accepted (queued), got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), `"queued"`) {
t.Errorf("expected queued ack, got: %s", w.Body.String())
}
// #2818 — the new wire shape carries task_id as the
// primary correlation key the canvas uses to recover
// the result via the GET /a2a/task/{task_id} endpoint.
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp["status"] != "queued" {
t.Errorf("response.status = %v, want %q", resp["status"], "queued")
}
if resp["delivery_mode"] != "push-async" {
t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "push-async")
}
taskID, _ := resp["task_id"].(string)
if taskID == "" {
t.Errorf("response.task_id missing (required for the late-arrival recovery path)")
}
// Returned at ~budget, NOT after the (blocked) agent — proves the cap fired.
if elapsed > 2*time.Second {
t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed)
@@ -3236,9 +3253,10 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
handler.ProxyA2A(c)
elapsed := time.Since(start)
// 1. The HTTP response is the queued ack (not the agent reply).
if w.Code != http.StatusOK {
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
// 1. The HTTP response is the queued ack (202 + task_id, not
// the agent reply). #2818 wire shape.
if w.Code != http.StatusAccepted {
t.Fatalf("expected 202 Accepted (queued), got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), `"queued"`) {
t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String())
@@ -3246,6 +3264,16 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
if !strings.Contains(w.Body.String(), `"push-async"`) {
t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String())
}
// task_id is the new primary correlation key the canvas
// uses to recover the result via the GET /a2a/task/{task_id}
// endpoint. Without it, the late-arrival fallback is impossible.
var e2eResp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &e2eResp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if e2eResp["task_id"] == nil || e2eResp["task_id"] == "" {
t.Errorf("response.task_id missing (required for the late-arrival recovery path): %s", w.Body.String())
}
// Returned at ~budget, NOT after the (blocked) agent.
if elapsed > 300*time.Millisecond {
t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed)
@@ -0,0 +1,342 @@
package handlers
// a2a_task_store.go — in-memory task store for the canvas cap-and-queue
// async-dispatch contract (core#2751, #2818).
//
// The cap-and-queue goroutine acks the canvas with `{status:"queued"}` (now
// 202 + task_id) well before the agent finishes its turn, so the canvas
// gets a prompt reply and the long turn completes on a detached forward
// ctx. The agent's eventual reply lands via the AGENT_MESSAGE WebSocket
// broadcast (the existing poll-mode contract).
//
// That's the happy path. The unhappy paths are what this store protects
// against:
//
// 1. WS push is best-effort — a dropped WebSocket (network blip, canvas
// hard-refresh between push and client registration, proxy restart
// mid-push) loses the reply. The canvas needs a way to recover the
// result without resubmitting the turn.
//
// 2. The cap-and-queue goroutine completes AFTER the canvas HTTP reply
// but BEFORE the client registered the WS handler. The WS push fires
// into the void.
//
// 3. Concurrent Decide / withdraw / kill-restart cycles can interleave
// with the agent's reply, and a second caller reading a half-written
// state would observe a torn result.
//
// The store is the durable buffer for (1) + (2): every cap-and-queue
// ack creates a TaskHandle, the detached goroutine writes the result via
// `Complete` when the agent replies, and the canvas can poll
// `GET /workspaces/:id/a2a/task/{task_id}` to recover. A 5-minute TTL +
// janitor goroutine bounds the memory footprint (5min × peak concurrent
// tasks; with a 90s budget + Cloudflare's 100s edge, the worst-case
// "task in flight" duration is ~30min including the agent turn itself,
// but the canvas can recover within 5min of the queued ack so the TTL
// is sized for that recovery window, not the full agent turn).
//
// Concurrency: package-level singleton (mu sync.RWMutex) — the store is
// pure in-process state, no DB, no per-handler state. A single instance
// per process is the natural shape; if a future deploy needs per-handler
// isolation (e.g. a multi-tenant SaaS that wants one store per tenant
// for blast-radius reasons), this is the place to refactor.
//
// Late-arrival safety: Complete is idempotent (first terminal wins —
// guarded by a CAS on the `completed` flag). A WS push that arrives
// after a poll already recovered the result is a no-op. This is the
// contract pin in TestTaskStore_LateArrivalRace.
import (
"errors"
"sync"
"time"
"github.com/google/uuid"
)
// ErrTaskNotFound is returned by Get when the task_id has either expired
// (TTL'd by the janitor) or never existed (caller typo, stale handle from
// a previous deploy, or — most commonly — a GET that raced the janitor
// by a hair). The two cases are indistinguishable to the caller, and
// that's intentional: leaking the existence-vs-not-yet would let a
// caller enumerate task_ids, which has no legitimate use.
var ErrTaskNotFound = errors.New("a2a task: not found")
// ErrTaskAlreadyCompleted is returned by Complete on a second call for a
// task that already has a terminal state. Distinct from ErrTaskNotFound
// so tests can assert the idempotency contract directly: a second
// Complete after a successful first one MUST return this error, not
// silently overwrite, not panic.
var ErrTaskAlreadyCompleted = errors.New("a2a task: already completed")
// TaskStatus is the lifecycle state of a TaskHandle.
type TaskStatus string
const (
// TaskStatusPending is the initial state when the cap-and-queue
// goroutine creates the handle. The agent has not yet replied.
TaskStatusPending TaskStatus = "pending"
// TaskStatusCompleted is the terminal state when Complete was
// called with a successful agent reply (HTTP 2xx from the
// workspace's A2A endpoint). Result + StatusCode are populated.
TaskStatusCompleted TaskStatus = "completed"
// TaskStatusFailed is the terminal state when Complete was
// called with a non-2xx / transport / dispatch error. Result
// may be empty; StatusCode is the proxy-side error code.
TaskStatusFailed TaskStatus = "failed"
)
// TaskResult is the buffered result of a completed A2A task. Mirrors
// the shape proxyA2ARequest would have returned to the canvas in the
// synchronous path, so a GET can synthesize the same response body the
// canvas would have parsed if it had held the connection.
type TaskResult struct {
// Status is the HTTP status code the canvas would have seen:
// 200 for a successful A2A dispatch, 4xx/5xx for an error.
// Note: NOT 202 — 202 was the queued ack, not the actual result.
Status int
// Body is the raw response body (JSON-RPC envelope for a
// message/send success, or {"error": "..."} envelope for a
// dispatch failure). Empty string for a TaskStatusFailed that
// lost the body to a transport error.
Body []byte
// ContentType is the response Content-Type. Almost always
// "application/json" (the proxy's default) but preserved so a
// future extension that streams binary parts doesn't have to
// refactor the store.
ContentType string
}
// TaskHandle is the in-memory record for a single cap-and-queue
// async-dispatch. Created by NewTaskHandle (called from ProxyA2A
// when the budget timer fires), completed by the detached goroutine
// when proxyA2ARequest returns, retrievable by task_id via the
// GET endpoint for canvas-side late-arrival recovery.
type TaskHandle struct {
// ID is the unique correlation key the canvas uses to
// reconcile the eventual WS push OR poll this store. UUID
// v4 so a caller can't enumerate neighboring task_ids.
ID string
// WorkspaceID is the workspace the task was raised against.
// The GET endpoint authz-checks against this — a caller
// from workspace A cannot poll a task_id created by
// workspace B (the cross-workspace approval-gate class
// from #2574 / #2593, mirrored here for consistency).
WorkspaceID string
// Method is the A2A method name (e.g. "message/send") the
// task was created for. Preserved so a future generic
// store (other A2A methods) doesn't lose the dispatch
// semantics.
Method string
// CreatedAt is when the handle was created (== the moment
// the cap-and-queue budget fired). The janitor uses this
// to compute age for the TTL eviction.
CreatedAt time.Time
// mu guards the mutable state below. RWMutex so a burst
// of GETs (the late-arrival polling path) doesn't serialize
// against each other.
mu sync.RWMutex
// status is the current lifecycle state. Read-mostly
// after Complete fires, so RWMutex is the right shape.
status TaskStatus
// result is the buffered reply, populated by Complete.
// nil until status moves out of pending.
result *TaskResult
}
// Status returns the current lifecycle state. Safe for concurrent
// reads — takes the read lock so a concurrent Complete can't tear
// the read.
func (t *TaskHandle) Status() TaskStatus {
t.mu.RLock()
defer t.mu.RUnlock()
return t.status
}
// Result returns the buffered reply + a bool indicating whether
// the task is in a terminal state. The bool is true once status
// has moved to completed/failed; before that, result is nil and
// the bool is false (caller should keep polling).
//
// Returned by-value (not by-pointer) so a caller can't observe a
// later mutation without re-reading. The Result pointer inside
// the handle is replaced atomically by Complete, so even a caller
// that captured a pointer would see the old value if Complete
// fires after; by-value copies the *pointer*, which is a known
// limitation — but the *content* is immutable post-Complete (the
// TaskResult struct itself isn't mutated after assignment), so
// the captured pointer is safe to read.
func (t *TaskHandle) Result() (TaskResult, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.status == TaskStatusPending || t.result == nil {
return TaskResult{}, false
}
return *t.result, true
}
// Complete moves the handle to a terminal state, buffering the
// result. Idempotent: a second call after a successful first one
// returns ErrTaskAlreadyCompleted (does NOT overwrite the result).
// This is the contract pin for the late-arrival race: a WS push
// that fires AFTER a poll already recovered the result is a no-op
// at the store layer; the canvas side then sees the same body
// either way and the only cost is one wasted broadcast.
//
// The CAS pattern (read status, write status if pending) is
// correct under concurrent Complete calls: exactly one caller
// wins, the other gets ErrTaskAlreadyCompleted. The first
// winner's result is what survives.
func (t *TaskHandle) Complete(status TaskStatus, result TaskResult) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.status != TaskStatusPending {
return ErrTaskAlreadyCompleted
}
t.status = status
// Defensive copy so a caller mutating their local TaskResult
// after Complete returns can't change the buffered result.
copied := result
if result.Body != nil {
copied.Body = append([]byte(nil), result.Body...)
}
t.result = &copied
return nil
}
// A2ATaskStore is the package-level in-memory store. Singleton
// per process; see file header for the multi-tenant refactor
// note. RWMutex so the GET-side polling path (which can fire
// every 5s per pending task) doesn't serialize against the
// Create/Complete paths.
type A2ATaskStore struct {
mu sync.RWMutex
tasks map[string]*TaskHandle
janitor *time.Ticker
stop chan struct{}
once sync.Once
}
// a2aTaskStoreTTL is how long a TaskHandle survives after
// creation before the janitor evicts it. 5min is the canvas
// recovery window per the design (#2818): the WS push is
// best-effort, the poll fallback is 5s, so a canvas that misses
// both the push AND the first poll can still recover via
// repeated polls for up to 5min before the store evicts the
// handle. After that, the agent reply is lost to the canvas
// (still in the activity_logs / events history for the agent
// owner, just not surfaceable via the task_id correlation key).
//
// 5min is NOT the agent turn timeout — a 30min agent turn can
// still be in flight at the 5min mark, and the result is then
// delivered via WS only (the canvas's only hope of seeing it
// is the push). The TTL is a recovery-window bound, not an
// agent-runtime bound.
const a2aTaskStoreTTL = 5 * time.Minute
// a2aTaskStoreJanitorInterval is how often the janitor scans
// for expired tasks. 60s is a fine balance: fast enough that
// the map doesn't grow unbounded under steady load, slow
// enough that the scan isn't a hot path.
const a2aTaskStoreJanitorInterval = 60 * time.Second
// a2aTaskStore is the package-level singleton. Initialized
// lazily on first NewTaskHandle call so tests that don't
// touch the store don't pay the janitor goroutine cost.
var a2aTaskStore = &A2ATaskStore{
tasks: make(map[string]*TaskHandle),
stop: make(chan struct{}),
}
// init starts the janitor on package import. The cost is one
// goroutine per process; the benefit is that the store is
// always ready when ProxyA2A first fires. Tests that don't
// touch the store still pay the goroutine cost — acceptable
// because the janitor only fires every 60s and does an O(N)
// scan of a map that's almost always empty.
func init() {
a2aTaskStore.once.Do(func() {
a2aTaskStore.janitor = time.NewTicker(a2aTaskStoreJanitorInterval)
go a2aTaskStore.runJanitor()
})
}
// runJanitor is the periodic eviction loop. Runs forever until
// process exit; no shutdown path because the process is
// short-lived (the platform restarts on deploy, and the OS
// reclaims the map on exit).
func (s *A2ATaskStore) runJanitor() {
for {
select {
case <-s.janitor.C:
s.pruneExpired()
case <-s.stop:
return
}
}
}
// pruneExpired evicts handles older than a2aTaskStoreTTL.
// O(N) over the map; N is bounded by the peak concurrent
// in-flight tasks (typically <100 even on a busy platform).
// The take-and-release lock pattern (copy keys under read
// lock, delete under write lock) is the standard Go map
// iteration under lock idiom — delete-during-iteration
// would invalidate the cursor.
func (s *A2ATaskStore) pruneExpired() {
cutoff := time.Now().Add(-a2aTaskStoreTTL)
var expired []string
s.mu.RLock()
for id, t := range s.tasks {
if t.CreatedAt.Before(cutoff) {
expired = append(expired, id)
}
}
s.mu.RUnlock()
if len(expired) == 0 {
return
}
s.mu.Lock()
for _, id := range expired {
delete(s.tasks, id)
}
s.mu.Unlock()
}
// NewTaskHandle creates a new pending TaskHandle, inserts it
// into the store, and returns it. The caller (ProxyA2A's
// cap-and-queue branch) embeds the returned ID in the 202
// response so the canvas can correlate.
func (s *A2ATaskStore) NewTaskHandle(workspaceID, method string) *TaskHandle {
t := &TaskHandle{
ID: uuid.NewString(),
WorkspaceID: workspaceID,
Method: method,
CreatedAt: time.Now(),
status: TaskStatusPending,
}
s.mu.Lock()
s.tasks[t.ID] = t
s.mu.Unlock()
return t
}
// Get retrieves a TaskHandle by ID. Returns ErrTaskNotFound
// for expired or never-existed handles. The authz check
// (workspace_id match) is the caller's responsibility —
// Get returns the handle regardless of workspace, the
// handler then checks handle.WorkspaceID against the
// caller's workspace.
func (s *A2ATaskStore) Get(taskID string) (*TaskHandle, error) {
s.mu.RLock()
t, ok := s.tasks[taskID]
s.mu.RUnlock()
if !ok {
return nil, ErrTaskNotFound
}
return t, nil
}
@@ -0,0 +1,315 @@
package handlers
// a2a_task_store_test.go — unit tests for the in-memory task store
// (core#2751 / #2818 async-dispatch contract). The store is the
// durable buffer for the cap-and-queue's late-arrival path: a
// canvas that misses the WS push polls the store via the
// GET /workspaces/:id/a2a/task/{task_id} endpoint to recover
// the buffered result. Tests pin:
// - Lifecycle: NewTaskHandle → Complete → Get round-trip
// - Concurrency: parallel Complete calls (only one wins — the
// first terminal is the durable result)
// - Late-arrival race: Get after Complete returns the buffered
// result; a second Complete is a no-op
// - TTL eviction: handles older than a2aTaskStoreTTL are pruned
// - Error paths: ErrTaskNotFound for expired/missing,
// ErrTaskAlreadyCompleted for double-Complete
import (
"bytes"
"sync"
"sync/atomic"
"testing"
"time"
)
// TestTaskStore_NewPendingHandle_GeneratesUniqueIDs pins the
// contract that every NewTaskHandle returns a fresh UUID v4
// (no collisions even under burst). Two handles from the same
// store must have different IDs.
func TestTaskStore_NewPendingHandle_GeneratesUniqueIDs(t *testing.T) {
s := newIsolatedTaskStore(t)
h1 := s.NewTaskHandle("ws-1", "message/send")
h2 := s.NewTaskHandle("ws-1", "message/send")
if h1.ID == h2.ID {
t.Errorf("expected unique IDs, got %q twice", h1.ID)
}
if h1.Status() != TaskStatusPending {
t.Errorf("new handle should be pending, got %q", h1.Status())
}
if h1.WorkspaceID != "ws-1" {
t.Errorf("WorkspaceID = %q, want ws-1", h1.WorkspaceID)
}
if h1.Method != "message/send" {
t.Errorf("Method = %q, want message/send", h1.Method)
}
}
// TestTaskStore_Complete_StoresResult — happy path: complete
// moves the handle out of pending, Result returns the buffered
// shape.
func TestTaskStore_Complete_StoresResult(t *testing.T) {
s := newIsolatedTaskStore(t)
h := s.NewTaskHandle("ws-1", "message/send")
body := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hi"}}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{
Status: 200,
Body: body,
ContentType: "application/json",
}); err != nil {
t.Fatalf("first Complete: %v", err)
}
if got := h.Status(); got != TaskStatusCompleted {
t.Errorf("Status after Complete = %q, want completed", got)
}
result, ok := h.Result()
if !ok {
t.Fatal("Result should be available after Complete")
}
if result.Status != 200 {
t.Errorf("Result.Status = %d, want 200", result.Status)
}
if !bytes.Equal(result.Body, body) {
t.Errorf("Result.Body = %q, want %q", result.Body, body)
}
if result.ContentType != "application/json" {
t.Errorf("Result.ContentType = %q, want application/json", result.ContentType)
}
}
// TestTaskStore_Complete_Idempotent — the load-bearing
// contract pin. A second Complete on an already-terminal
// handle returns ErrTaskAlreadyCompleted AND does NOT
// overwrite the buffered result. This is what protects
// the late-arrival race: a WS push that fires after a
// poll already recovered the result is a no-op at the
// store layer; the first Complete wins.
func TestTaskStore_Complete_Idempotent(t *testing.T) {
s := newIsolatedTaskStore(t)
h := s.NewTaskHandle("ws-1", "message/send")
first := []byte(`{"result":"first"}`)
second := []byte(`{"result":"second"}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: first}); err != nil {
t.Fatalf("first Complete: %v", err)
}
err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: second})
if err != ErrTaskAlreadyCompleted {
t.Errorf("second Complete: want ErrTaskAlreadyCompleted, got %v", err)
}
// First result survives.
result, ok := h.Result()
if !ok {
t.Fatal("Result should be available after first Complete")
}
if !bytes.Equal(result.Body, first) {
t.Errorf("second Complete overwrote result: got %q, want %q", result.Body, first)
}
}
// TestTaskStore_Complete_ConcurrentFirstWins — the concurrent
// version of the idempotency contract. Multiple goroutines
// race to Complete the same handle; exactly one wins, the
// others get ErrTaskAlreadyCompleted, and the surviving
// result is the one from the winning goroutine.
func TestTaskStore_Complete_ConcurrentFirstWins(t *testing.T) {
s := newIsolatedTaskStore(t)
h := s.NewTaskHandle("ws-1", "message/send")
const N = 50
var winners atomic.Int32
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
i := i
go func() {
defer wg.Done()
body := []byte(`{"id":` + itoaForTest(i) + `}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: body}); err == nil {
winners.Add(1)
} else if err != ErrTaskAlreadyCompleted {
t.Errorf("goroutine %d: unexpected error %v", i, err)
}
}()
}
wg.Wait()
if got := winners.Load(); got != 1 {
t.Errorf("expected exactly 1 winner, got %d", got)
}
// Result is the winning goroutine's body — we don't know
// which one, but it must be non-nil and parseable as the
// shape Complete wrote.
result, ok := h.Result()
if !ok {
t.Fatal("Result should be available after concurrent Complete")
}
if len(result.Body) == 0 {
t.Error("winning Complete produced empty body")
}
}
// TestTaskStore_Get_UnknownTaskID — the basic error path.
// Get for a task_id that never existed returns
// ErrTaskNotFound.
func TestTaskStore_Get_UnknownTaskID(t *testing.T) {
s := newIsolatedTaskStore(t)
if _, err := s.Get("00000000-0000-0000-0000-000000000000"); err != ErrTaskNotFound {
t.Errorf("Get unknown: want ErrTaskNotFound, got %v", err)
}
}
// TestTaskStore_Prune_RemovesExpired — the TTL eviction
// contract. A handle older than a2aTaskStoreTTL is removed
// from the store on the next pruneExpired call. Test
// bypasses the real timer by calling pruneExpired
// directly with a synthetically-aged handle.
func TestTaskStore_Prune_RemovesExpired(t *testing.T) {
s := newIsolatedTaskStore(t)
h := s.NewTaskHandle("ws-1", "message/send")
// Backdate the handle past the TTL.
h.CreatedAt = time.Now().Add(-2 * a2aTaskStoreTTL)
s.pruneExpired()
if _, err := s.Get(h.ID); err != ErrTaskNotFound {
t.Errorf("expected expired handle to be pruned, got %v", err)
}
}
// TestTaskStore_LateArrivalRace — the contract-critical
// integration test for #2818. The full flow:
// 1. cap-and-queue creates the handle, returns task_id
// 2. agent is slow; canvas polls via the GET endpoint
// while the handle is still pending → 202
// 3. agent replies; goroutine Completes the handle
// 4. canvas polls again → 200 with the buffered result
// 5. WS push fires after the poll already recovered
// the result; the late-arrival Complete is a no-op
// (ErrTaskAlreadyCompleted)
func TestTaskStore_LateArrivalRace(t *testing.T) {
s := newIsolatedTaskStore(t)
h := s.NewTaskHandle("ws-1", "message/send")
// Step 2: poll while pending.
if h.Status() != TaskStatusPending {
t.Fatalf("expected pending, got %q", h.Status())
}
if _, ok := h.Result(); ok {
t.Fatal("Result should not be available before Complete")
}
// Step 3: agent replies.
reply := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hello"}}`)
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: reply}); err != nil {
t.Fatalf("Complete: %v", err)
}
// Step 4: poll after Complete returns the buffered result.
result, ok := h.Result()
if !ok {
t.Fatal("Result should be available after Complete")
}
if !bytes.Equal(result.Body, reply) {
t.Errorf("Result.Body = %q, want %q", result.Body, reply)
}
// Step 5: a late-arrival Complete (e.g. WS push firing
// after the canvas's poll already recovered) is a
// no-op — the second call returns ErrTaskAlreadyCompleted
// and does NOT overwrite the first result.
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: []byte(`{"late":"winner"}`)}); err != ErrTaskAlreadyCompleted {
t.Errorf("late-arrival Complete: want ErrTaskAlreadyCompleted, got %v", err)
}
result2, _ := h.Result()
if !bytes.Equal(result2.Body, reply) {
t.Errorf("late-arrival Complete overwrote result: got %q, want %q", result2.Body, reply)
}
}
// TestTaskStore_ConcurrentAccess — many goroutines
// creating + completing + getting handles, no data race.
// Run with `go test -race` to catch the race detector.
func TestTaskStore_ConcurrentAccess(t *testing.T) {
s := newIsolatedTaskStore(t)
const N = 100
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
h := s.NewTaskHandle("ws-1", "message/send")
_ = h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: []byte(`ok`)})
_, _ = s.Get(h.ID)
}()
}
wg.Wait()
}
// TestExtractA2AMethod pins the lightweight JSON peek used
// by the cap-and-queue branch to record which A2A method
// raised the task. Tolerant of malformed input (returns
// "" on parse error so the task_id is still issued).
func TestExtractA2AMethod(t *testing.T) {
cases := []struct {
name string
body string
want string
}{
{"message/send", `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{}}`, "message/send"},
{"message/stream", `{"method":"message/stream"}`, "message/stream"},
{"initialize", `{"method":"initialize"}`, "initialize"},
{"missing-method", `{"jsonrpc":"2.0","id":"1","params":{}}`, ""},
{"non-string-method", `{"method":42}`, ""},
{"malformed", `{"method":`, ""},
{"empty", ``, ""},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if got := extractA2AMethod([]byte(c.body)); got != c.want {
t.Errorf("extractA2AMethod(%q) = %q, want %q", c.body, got, c.want)
}
})
}
}
// newIsolatedTaskStore creates a fresh A2ATaskStore for the
// test, replacing the package-level singleton. Avoids the
// real janitor goroutine and the shared-state bleed that
// would happen if two tests used the same store.
func newIsolatedTaskStore(t *testing.T) *A2ATaskStore {
t.Helper()
return &A2ATaskStore{
tasks: make(map[string]*TaskHandle),
stop: make(chan struct{}),
}
}
// itoa is a tiny no-allocation integer-to-string for the
// concurrent test (faster than strconv.Itoa in a hot loop).
// Defined in a way that doesn't collide with class1_ast_gate_test.go
// (which has its own itoa) by using a per-test wrapper.
func itoaForTest(i int) string {
if i == 0 {
return "0"
}
neg := i < 0
if neg {
i = -i
}
var buf [20]byte
pos := len(buf)
for i > 0 {
pos--
buf[pos] = byte('0' + i%10)
i /= 10
}
if neg {
pos--
buf[pos] = '-'
}
return string(buf[pos:])
}
@@ -250,6 +250,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// org-level token). Existence-non-inferring 404 on auth failure.
r.GET("/workspaces/:id/a2a/queue/:queue_id", wh.GetA2AQueueStatus)
// A2A task lookup (#2818) — the late-arrival recovery path for
// the cap-and-queue async-dispatch contract. Canvas polls this
// every 5s when the WS push is missed, looking for the buffered
// result. Same routing rationale as /a2a/queue/:queue_id (auth
// is per-handler against the task's WorkspaceID).
r.GET("/workspaces/:id/a2a/task/:taskId", wh.GetA2ATask)
// Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a.
// Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13
// by requiring a valid bearer token for any workspace that has one on file.