fix(workspace-server#2818 Phase 1+2): task_id correlation key + late-arrival recovery #2852
@@ -0,0 +1,287 @@
|
||||
package handlers
|
||||
|
||||
// a2a_get_task_test.go — integration tests for the GetA2ATask
|
||||
// HTTP handler (#2818, #2751). The handler is the canvas's
|
||||
// late-arrival recovery path: when the WS push is missed, the
|
||||
// canvas polls GET /workspaces/:id/a2a/task/{task_id} every
|
||||
// 5s to recover the buffered result. Tests pin:
|
||||
// - Pending task: 202 + {status:"pending", task_id} (canvas
|
||||
// keeps polling)
|
||||
// - Completed task: 200 + buffered result body (canvas
|
||||
// synthesizes the response and renders the agent reply)
|
||||
// - Failed task: 200 + buffered error envelope
|
||||
// - Unknown task_id: 404 (janitor-evicted or never-existed)
|
||||
// - Cross-workspace: 403 (caller is not the task's creator)
|
||||
// - Late-arrival race: Complete after a poll-recovery is a
|
||||
// no-op (TaskStore-level idempotency)
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestGetA2ATask_PendingReturns202 is the canvas's "keep
|
||||
// polling" signal: the task is still in flight, no result is
|
||||
// available yet, the canvas should keep its 5s poll timer
|
||||
// running. HTTP 202 (not 200) so the canvas can distinguish
|
||||
// "agent still working" from "agent finished" without reading
|
||||
// the body — the body is a small {status,task_id} envelope.
|
||||
func TestGetA2ATask_PendingReturns202(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Use the package-level store (initialized via init()) so
|
||||
// the test sees the real janitor/wiring.
|
||||
h := a2aTaskStore.NewTaskHandle("ws-pending", "message/send")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-pending"}, {Key: "taskId", Value: h.ID}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-pending/a2a/task/"+h.ID, nil)
|
||||
|
||||
handler.GetA2ATask(c)
|
||||
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected 202 Accepted for pending, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "pending" {
|
||||
t.Errorf("status = %v, want pending", resp["status"])
|
||||
}
|
||||
if resp["task_id"] != h.ID {
|
||||
t.Errorf("task_id = %v, want %v", resp["task_id"], h.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2ATask_CompletedReturns200WithBody is the canvas's
|
||||
// "agent finished, here's the result" signal. The body is
|
||||
// base64-encoded to preserve the exact bytes of the original
|
||||
// JSON-RPC response (the canvas decodes + re-parses).
|
||||
func TestGetA2ATask_CompletedReturns200WithBody(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
h := a2aTaskStore.NewTaskHandle("ws-done", "message/send")
|
||||
reply := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hello"}}`)
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{
|
||||
Status: 200,
|
||||
Body: reply,
|
||||
ContentType: "application/json",
|
||||
}); err != nil {
|
||||
t.Fatalf("Complete: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-done"}, {Key: "taskId", Value: h.ID}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-done/a2a/task/"+h.ID, nil)
|
||||
|
||||
handler.GetA2ATask(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 OK for completed, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "completed" {
|
||||
t.Errorf("status = %v, want completed", resp["status"])
|
||||
}
|
||||
if got, _ := resp["http_status"].(float64); int(got) != 200 {
|
||||
t.Errorf("http_status = %v, want 200", resp["http_status"])
|
||||
}
|
||||
encoded, _ := resp["body"].(string)
|
||||
decoded, err := base64.StdEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
t.Fatalf("body is not valid base64: %v", err)
|
||||
}
|
||||
if string(decoded) != string(reply) {
|
||||
t.Errorf("decoded body = %q, want %q", decoded, reply)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2ATask_FailedReturns200WithError pins the error
|
||||
// path: the agent's dispatch returned a non-2xx / transport
|
||||
// error. The handler returns 200 with status:"failed" and
|
||||
// the buffered error envelope (the canvas treats this as
|
||||
// "agent replied, but with an error" — the same shape as a
|
||||
// synchronous 4xx/5xx).
|
||||
func TestGetA2ATask_FailedReturns200WithError(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
h := a2aTaskStore.NewTaskHandle("ws-err", "message/send")
|
||||
errEnv := []byte(`{"error":"agent timeout"}`)
|
||||
if err := h.Complete(TaskStatusFailed, TaskResult{
|
||||
Status: http.StatusGatewayTimeout,
|
||||
Body: errEnv,
|
||||
ContentType: "application/json",
|
||||
}); err != nil {
|
||||
t.Fatalf("Complete: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-err"}, {Key: "taskId", Value: h.ID}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-err/a2a/task/"+h.ID, nil)
|
||||
|
||||
handler.GetA2ATask(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 OK (with error envelope in body) for failed, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "failed" {
|
||||
t.Errorf("status = %v, want failed", resp["status"])
|
||||
}
|
||||
if got, _ := resp["http_status"].(float64); int(got) != http.StatusGatewayTimeout {
|
||||
t.Errorf("http_status = %v, want 504", resp["http_status"])
|
||||
}
|
||||
encoded, _ := resp["body"].(string)
|
||||
decoded, err := base64.StdEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
t.Fatalf("body is not valid base64: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(decoded), "agent timeout") {
|
||||
t.Errorf("decoded body = %q, want substring %q", decoded, "agent timeout")
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2ATask_UnknownReturns404 pins the 404 contract:
|
||||
// the task_id is unknown (never existed) or has been evicted
|
||||
// by the janitor. Either way, the canvas's contract on 404
|
||||
// is "give up, surface an error to the user." The handler
|
||||
// returns 404 with a generic message that does NOT leak
|
||||
// whether the task_id was once known.
|
||||
func TestGetA2ATask_UnknownReturns404(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-x"}, {Key: "taskId", Value: "00000000-0000-0000-0000-000000000000"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-x/a2a/task/00000000-0000-0000-0000-000000000000", nil)
|
||||
|
||||
handler.GetA2ATask(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404 Not Found, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2ATask_CrossWorkspaceReturns403 pins the authz
|
||||
// check. A caller in workspace A polls a task_id that was
|
||||
// created in workspace B. The handler returns 403 (not 404)
|
||||
// so the caller can distinguish "wrong workspace" (their
|
||||
// request is auth-blocked) from "task truly gone" (janitor
|
||||
// evicted it, no auth concern). The 404-vs-403 distinction
|
||||
// matters for retry logic: 403 is a permanent auth failure
|
||||
// (no point retrying); 404 might be a transient eviction
|
||||
// (retrying with a different task_id is the right move).
|
||||
func TestGetA2ATask_CrossWorkspaceReturns403(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
h := a2aTaskStore.NewTaskHandle("ws-creator", "message/send")
|
||||
// Don't complete — pending is fine for this test, the
|
||||
// authz check runs before the status dispatch.
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-evil"}, {Key: "taskId", Value: h.ID}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-evil/a2a/task/"+h.ID, nil)
|
||||
|
||||
handler.GetA2ATask(c)
|
||||
|
||||
if w.Code != http.StatusForbidden {
|
||||
t.Errorf("expected 403 Forbidden, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2ATask_LateArrivalRace pins the full late-arrival
|
||||
// flow end-to-end through the HTTP handler. Sequence:
|
||||
// 1. cap-and-queue creates the handle, returns task_id
|
||||
// 2. canvas polls: pending (202)
|
||||
// 3. agent replies: handle.Complete (200 completed)
|
||||
// 4. canvas polls again: completed (200)
|
||||
// 5. WS push fires after step 4; second Complete is a
|
||||
// no-op (idempotent at the store layer)
|
||||
// 6. canvas polls again: still completed (result unchanged)
|
||||
func TestGetA2ATask_LateArrivalRace(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
h := a2aTaskStore.NewTaskHandle("ws-race", "message/send")
|
||||
|
||||
// Step 2: poll while pending → 202
|
||||
w1 := httptest.NewRecorder()
|
||||
c1, _ := gin.CreateTestContext(w1)
|
||||
c1.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}}
|
||||
c1.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil)
|
||||
handler.GetA2ATask(c1)
|
||||
if w1.Code != http.StatusAccepted {
|
||||
t.Fatalf("step 2 (pending poll): want 202, got %d", w1.Code)
|
||||
}
|
||||
|
||||
// Step 3: agent replies
|
||||
first := []byte(`{"result":"first"}`)
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: first}); err != nil {
|
||||
t.Fatalf("first Complete: %v", err)
|
||||
}
|
||||
|
||||
// Step 4: poll after complete → 200 with first result
|
||||
w2 := httptest.NewRecorder()
|
||||
c2, _ := gin.CreateTestContext(w2)
|
||||
c2.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}}
|
||||
c2.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil)
|
||||
handler.GetA2ATask(c2)
|
||||
if w2.Code != http.StatusOK {
|
||||
t.Fatalf("step 4 (completed poll): want 200, got %d", w2.Code)
|
||||
}
|
||||
var resp2 map[string]interface{}
|
||||
_ = json.Unmarshal(w2.Body.Bytes(), &resp2)
|
||||
enc2, _ := resp2["body"].(string)
|
||||
dec2, _ := base64.StdEncoding.DecodeString(enc2)
|
||||
if string(dec2) != string(first) {
|
||||
t.Errorf("step 4 body = %q, want %q", dec2, first)
|
||||
}
|
||||
|
||||
// Step 5: late-arrival Complete is a no-op
|
||||
second := []byte(`{"result":"second"}`)
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: second}); err != ErrTaskAlreadyCompleted {
|
||||
t.Errorf("step 5 (late Complete): want ErrTaskAlreadyCompleted, got %v", err)
|
||||
}
|
||||
|
||||
// Step 6: poll again — result must NOT have been overwritten
|
||||
w3 := httptest.NewRecorder()
|
||||
c3, _ := gin.CreateTestContext(w3)
|
||||
c3.Params = gin.Params{{Key: "id", Value: "ws-race"}, {Key: "taskId", Value: h.ID}}
|
||||
c3.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.ID, nil)
|
||||
handler.GetA2ATask(c3)
|
||||
var resp3 map[string]interface{}
|
||||
_ = json.Unmarshal(w3.Body.Bytes(), &resp3)
|
||||
enc3, _ := resp3["body"].(string)
|
||||
dec3, _ := base64.StdEncoding.DecodeString(enc3)
|
||||
if string(dec3) != string(first) {
|
||||
t.Errorf("step 6 (post-late-arrival poll): body = %q, want %q (first result must survive)", dec3, first)
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
@@ -415,10 +416,43 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
detached := context.WithoutCancel(ctx)
|
||||
budget := canvasA2ASyncBudget() // local copy for the time.After below
|
||||
done := make(chan a2aResult, 1)
|
||||
|
||||
// #2818 — pre-create the TaskHandle BEFORE the goroutine
|
||||
// starts so the canvas can poll for the result even if the
|
||||
// agent reply is still in flight when the HTTP response is
|
||||
// flushed. The task_id becomes the primary correlation key
|
||||
// the canvas uses to recover the result if the WS push is
|
||||
// missed (5s polling fallback per the design).
|
||||
//
|
||||
// The task_id is generic across all A2A methods; we record
|
||||
// which method raised the task for future dispatch-meta
|
||||
// surfaces (and the GetA2ATask handler can echo it back).
|
||||
a2aMethod := extractA2AMethod(body)
|
||||
taskHandle := a2aTaskStore.NewTaskHandle(workspaceID, a2aMethod)
|
||||
|
||||
h.asyncWG.Add(1)
|
||||
go func() {
|
||||
defer h.asyncWG.Done()
|
||||
s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser)
|
||||
// #2818 — buffer the result into the store so a
|
||||
// late canvas poll can recover. First terminal
|
||||
// wins (idempotent CAS — see TaskHandle.Complete).
|
||||
// The WS broadcast still fires unconditionally
|
||||
// below; the store is the durable buffer for the
|
||||
// case where the WS push was missed.
|
||||
if pe != nil {
|
||||
_ = taskHandle.Complete(TaskStatusFailed, TaskResult{
|
||||
Status: pe.Status,
|
||||
Body: a2aErrorResponseBody(pe),
|
||||
ContentType: "application/json",
|
||||
})
|
||||
} else {
|
||||
_ = taskHandle.Complete(TaskStatusCompleted, TaskResult{
|
||||
Status: s,
|
||||
Body: b,
|
||||
ContentType: "application/json",
|
||||
})
|
||||
}
|
||||
done <- a2aResult{s, b, pe}
|
||||
}()
|
||||
select {
|
||||
@@ -433,10 +467,22 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
c.Data(r.status, "application/json", r.body)
|
||||
return
|
||||
case <-time.After(budget):
|
||||
// Outlived CF's edge limit — ack queued; the goroutine finishes and
|
||||
// the reply lands via WS. The canvas already treats `queued` as
|
||||
// "still processing" (delivery_mode mirrors poll-mode).
|
||||
c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"})
|
||||
// Outlived CF's edge limit — ack queued (202 + task_id);
|
||||
// the goroutine finishes and the reply lands via WS. The
|
||||
// canvas correlates the eventual A2A_RESPONSE WS push to
|
||||
// the task_id (primary key) and falls back to polling
|
||||
// GET /workspaces/:id/a2a/task/{task_id} if the WS push
|
||||
// is missed (5s timer per the design). The 202 status
|
||||
// (not 200) is the new wire shape — older canvases that
|
||||
// only check for {status:"queued"} still match (the
|
||||
// body shape is backward-compatible with the legacy
|
||||
// 200-ack that #2800 shipped).
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"status": "queued",
|
||||
"task_id": taskHandle.ID,
|
||||
"delivery_mode": "push-async",
|
||||
"method": "message/send",
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -453,6 +499,108 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
c.Data(status, "application/json", respBody)
|
||||
}
|
||||
|
||||
// GetA2ATask handles GET /workspaces/:id/a2a/task/:taskId — the canvas's
|
||||
// late-arrival recovery path for the cap-and-queue async-dispatch
|
||||
// contract (#2818, #2751).
|
||||
//
|
||||
// Lifecycle:
|
||||
// - Cap-and-queue fires the budget timer → handler issues a 202 +
|
||||
// task_id, creates a TaskHandle in `pending` state, the detached
|
||||
// goroutine continues the agent dispatch.
|
||||
// - Canvas correlates the eventual A2A_RESPONSE WS push to task_id
|
||||
// (primary key).
|
||||
// - If the WS push is missed (drop, hard refresh, late registration),
|
||||
// the canvas's 5s polling fallback calls THIS endpoint to recover
|
||||
// the buffered result. The TaskHandle has already moved to
|
||||
// `completed` or `failed` (the goroutine writes the result via
|
||||
// TaskHandle.Complete before the WS broadcast fires).
|
||||
// - 5min after creation the janitor evicts the handle; subsequent
|
||||
// GETs return 404 (the canvas's contract on a 404 is "give up,
|
||||
// surface an error to the user").
|
||||
//
|
||||
// Authz: the caller's workspace token must match the handle's
|
||||
// WorkspaceID. The check is the same pattern approvals.Withdraw uses
|
||||
// (the row's creator, not the path :id) — but for A2A tasks the
|
||||
// creator IS the row's workspace, and the path's :id is also the
|
||||
// creator workspace (A2A isn't cross-workspace in the way approval
|
||||
// gates are). The check is therefore: path :id == handle.WorkspaceID.
|
||||
// A mismatch returns 403 (caller is not the workspace that raised
|
||||
// the task; could be a misrouted request or a stale handle from a
|
||||
// previous deploy).
|
||||
//
|
||||
// Response shape (completed):
|
||||
//
|
||||
// HTTP/1.1 200 OK
|
||||
// { "status": "completed", "task_id": "...", "http_status": 200,
|
||||
// "body": <base64 of original response body>, "content_type": "..." }
|
||||
//
|
||||
// Response shape (still pending):
|
||||
//
|
||||
// HTTP/1.1 202 Accepted
|
||||
// { "status": "pending", "task_id": "..." }
|
||||
//
|
||||
// Response shape (failed):
|
||||
//
|
||||
// HTTP/1.1 200 OK
|
||||
// { "status": "failed", "task_id": "...", "http_status": <error code>,
|
||||
// "body": <base64 of error envelope>, "content_type": "..." }
|
||||
//
|
||||
// HTTP 202 is used for "pending" so the canvas can distinguish
|
||||
// "agent still working, keep polling" from "agent finished, here's
|
||||
// the result" without reading the body. Body is base64-encoded to
|
||||
// avoid JSON escaping for arbitrary JSON-RPC payload shapes (some
|
||||
// upstream encoders produce non-UTF-8 bytes for binary parts).
|
||||
func (h *WorkspaceHandler) GetA2ATask(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
taskID := c.Param("taskId")
|
||||
|
||||
handle, err := a2aTaskStore.Get(taskID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "task not found or expired"})
|
||||
return
|
||||
}
|
||||
|
||||
// Authz: path :id must match the handle's WorkspaceID. A
|
||||
// mismatch is a 403, not a 404, so the caller can distinguish
|
||||
// "wrong workspace" from "task truly not in the store".
|
||||
if handle.WorkspaceID != workspaceID {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "task does not belong to this workspace"})
|
||||
return
|
||||
}
|
||||
|
||||
switch handle.Status() {
|
||||
case TaskStatusPending:
|
||||
// Still in flight. Canvas keeps polling.
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"status": "pending",
|
||||
"task_id": handle.ID,
|
||||
})
|
||||
return
|
||||
case TaskStatusCompleted, TaskStatusFailed:
|
||||
result, _ := handle.Result()
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": string(handle.Status()),
|
||||
"task_id": handle.ID,
|
||||
"http_status": result.Status,
|
||||
"body": base64.StdEncoding.EncodeToString(result.Body),
|
||||
"content_type": result.ContentType,
|
||||
})
|
||||
return
|
||||
default:
|
||||
// Defensive: any future TaskStatus value lands here.
|
||||
// Returning 500 surfaces the bug to the canvas as a
|
||||
// transport-style error (the canvas's contract on 5xx
|
||||
// is "surface to user"); 200 with a weird body would
|
||||
// be silent corruption.
|
||||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"error": "unknown task status",
|
||||
"status": string(handle.Status()),
|
||||
"task_id": handle.ID,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// checkWorkspaceBudget returns a proxyA2AError with 402 when the workspace has
|
||||
// exceeded ANY of its configured per-period budget limits (hourly/daily/weekly/
|
||||
// monthly — see budget_periods.go). Per-period spend is the rolling-window sum
|
||||
@@ -1232,6 +1380,49 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
// extractA2AMethod is a lightweight, non-mutating JSON parse that pulls
|
||||
// the `method` field out of a JSON-RPC envelope. Used by the cap-and-queue
|
||||
// branch (#2818) to record which A2A method raised the task before the
|
||||
// full normalizeA2APayload runs. Cheap (no allocations beyond the
|
||||
// interface map), tolerant of malformed input (returns "" on parse
|
||||
// error, so the task_id is still issued but the Method field is
|
||||
// empty — the canvas's contract is on task_id, not Method).
|
||||
//
|
||||
// NOT a replacement for normalizeA2APayload — that one mutates the
|
||||
// body (adds jsonrpc envelope, messageId default, role default,
|
||||
// parts wrap for v0.2→v0.3 compat) and is the authoritative shape
|
||||
// for the downstream dispatch. extractA2AMethod is just a "what
|
||||
// method was this?" peek for the task metadata.
|
||||
func extractA2AMethod(body []byte) string {
|
||||
var payload map[string]interface{}
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
return ""
|
||||
}
|
||||
if m, ok := payload["method"].(string); ok {
|
||||
return m
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// a2aErrorResponseBody synthesizes the JSON response body for a
|
||||
// proxyA2AError — used by the cap-and-queue goroutine to buffer the
|
||||
// result into the TaskStore so a late canvas poll can recover the
|
||||
// error envelope. Mirrors the wire shape c.JSON(pe.Status, pe.Response)
|
||||
// would have produced in the synchronous path: marshal the gin.H map
|
||||
// to JSON. Failure to marshal (gin.H contains an unserializable
|
||||
// value, e.g. a channel) is impossible in practice but defensively
|
||||
// returns an empty body so the buffer still has a non-nil marker.
|
||||
func a2aErrorResponseBody(pe *proxyA2AError) []byte {
|
||||
if pe == nil || pe.Response == nil {
|
||||
return nil
|
||||
}
|
||||
b, err := json.Marshal(pe.Response)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// canvasA2ASyncBudget is the extracted lookup for the cap-and-queue synchronous
|
||||
// wait (core#2751). Extracted from the ProxyA2A handler so the default value
|
||||
// can be unit-tested directly without source-string matching — a regression of
|
||||
|
||||
@@ -3009,12 +3009,29 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
|
||||
handler.ProxyA2A(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected 202 Accepted (queued), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), `"queued"`) {
|
||||
t.Errorf("expected queued ack, got: %s", w.Body.String())
|
||||
}
|
||||
// #2818 — the new wire shape carries task_id as the
|
||||
// primary correlation key the canvas uses to recover
|
||||
// the result via the GET /a2a/task/{task_id} endpoint.
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "queued" {
|
||||
t.Errorf("response.status = %v, want %q", resp["status"], "queued")
|
||||
}
|
||||
if resp["delivery_mode"] != "push-async" {
|
||||
t.Errorf("response.delivery_mode = %v, want %q", resp["delivery_mode"], "push-async")
|
||||
}
|
||||
taskID, _ := resp["task_id"].(string)
|
||||
if taskID == "" {
|
||||
t.Errorf("response.task_id missing (required for the late-arrival recovery path)")
|
||||
}
|
||||
// Returned at ~budget, NOT after the (blocked) agent — proves the cap fired.
|
||||
if elapsed > 2*time.Second {
|
||||
t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed)
|
||||
@@ -3236,9 +3253,10 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
|
||||
handler.ProxyA2A(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// 1. The HTTP response is the queued ack (not the agent reply).
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
||||
// 1. The HTTP response is the queued ack (202 + task_id, not
|
||||
// the agent reply). #2818 wire shape.
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected 202 Accepted (queued), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), `"queued"`) {
|
||||
t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String())
|
||||
@@ -3246,6 +3264,16 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
|
||||
if !strings.Contains(w.Body.String(), `"push-async"`) {
|
||||
t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String())
|
||||
}
|
||||
// task_id is the new primary correlation key the canvas
|
||||
// uses to recover the result via the GET /a2a/task/{task_id}
|
||||
// endpoint. Without it, the late-arrival fallback is impossible.
|
||||
var e2eResp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &e2eResp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if e2eResp["task_id"] == nil || e2eResp["task_id"] == "" {
|
||||
t.Errorf("response.task_id missing (required for the late-arrival recovery path): %s", w.Body.String())
|
||||
}
|
||||
// Returned at ~budget, NOT after the (blocked) agent.
|
||||
if elapsed > 300*time.Millisecond {
|
||||
t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed)
|
||||
|
||||
@@ -0,0 +1,342 @@
|
||||
package handlers
|
||||
|
||||
// a2a_task_store.go — in-memory task store for the canvas cap-and-queue
|
||||
// async-dispatch contract (core#2751, #2818).
|
||||
//
|
||||
// The cap-and-queue goroutine acks the canvas with `{status:"queued"}` (now
|
||||
// 202 + task_id) well before the agent finishes its turn, so the canvas
|
||||
// gets a prompt reply and the long turn completes on a detached forward
|
||||
// ctx. The agent's eventual reply lands via the AGENT_MESSAGE WebSocket
|
||||
// broadcast (the existing poll-mode contract).
|
||||
//
|
||||
// That's the happy path. The unhappy paths are what this store protects
|
||||
// against:
|
||||
//
|
||||
// 1. WS push is best-effort — a dropped WebSocket (network blip, canvas
|
||||
// hard-refresh between push and client registration, proxy restart
|
||||
// mid-push) loses the reply. The canvas needs a way to recover the
|
||||
// result without resubmitting the turn.
|
||||
//
|
||||
// 2. The cap-and-queue goroutine completes AFTER the canvas HTTP reply
|
||||
// but BEFORE the client registered the WS handler. The WS push fires
|
||||
// into the void.
|
||||
//
|
||||
// 3. Concurrent Decide / withdraw / kill-restart cycles can interleave
|
||||
// with the agent's reply, and a second caller reading a half-written
|
||||
// state would observe a torn result.
|
||||
//
|
||||
// The store is the durable buffer for (1) + (2): every cap-and-queue
|
||||
// ack creates a TaskHandle, the detached goroutine writes the result via
|
||||
// `Complete` when the agent replies, and the canvas can poll
|
||||
// `GET /workspaces/:id/a2a/task/{task_id}` to recover. A 5-minute TTL +
|
||||
// janitor goroutine bounds the memory footprint (5min × peak concurrent
|
||||
// tasks; with a 90s budget + Cloudflare's 100s edge, the worst-case
|
||||
// "task in flight" duration is ~30min including the agent turn itself,
|
||||
// but the canvas can recover within 5min of the queued ack so the TTL
|
||||
// is sized for that recovery window, not the full agent turn).
|
||||
//
|
||||
// Concurrency: package-level singleton (mu sync.RWMutex) — the store is
|
||||
// pure in-process state, no DB, no per-handler state. A single instance
|
||||
// per process is the natural shape; if a future deploy needs per-handler
|
||||
// isolation (e.g. a multi-tenant SaaS that wants one store per tenant
|
||||
// for blast-radius reasons), this is the place to refactor.
|
||||
//
|
||||
// Late-arrival safety: Complete is idempotent (first terminal wins —
|
||||
// guarded by a CAS on the `completed` flag). A WS push that arrives
|
||||
// after a poll already recovered the result is a no-op. This is the
|
||||
// contract pin in TestTaskStore_LateArrivalRace.
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// ErrTaskNotFound is returned by Get when the task_id has either expired
|
||||
// (TTL'd by the janitor) or never existed (caller typo, stale handle from
|
||||
// a previous deploy, or — most commonly — a GET that raced the janitor
|
||||
// by a hair). The two cases are indistinguishable to the caller, and
|
||||
// that's intentional: leaking the existence-vs-not-yet would let a
|
||||
// caller enumerate task_ids, which has no legitimate use.
|
||||
var ErrTaskNotFound = errors.New("a2a task: not found")
|
||||
|
||||
// ErrTaskAlreadyCompleted is returned by Complete on a second call for a
|
||||
// task that already has a terminal state. Distinct from ErrTaskNotFound
|
||||
// so tests can assert the idempotency contract directly: a second
|
||||
// Complete after a successful first one MUST return this error, not
|
||||
// silently overwrite, not panic.
|
||||
var ErrTaskAlreadyCompleted = errors.New("a2a task: already completed")
|
||||
|
||||
// TaskStatus is the lifecycle state of a TaskHandle.
|
||||
type TaskStatus string
|
||||
|
||||
const (
|
||||
// TaskStatusPending is the initial state when the cap-and-queue
|
||||
// goroutine creates the handle. The agent has not yet replied.
|
||||
TaskStatusPending TaskStatus = "pending"
|
||||
// TaskStatusCompleted is the terminal state when Complete was
|
||||
// called with a successful agent reply (HTTP 2xx from the
|
||||
// workspace's A2A endpoint). Result + StatusCode are populated.
|
||||
TaskStatusCompleted TaskStatus = "completed"
|
||||
// TaskStatusFailed is the terminal state when Complete was
|
||||
// called with a non-2xx / transport / dispatch error. Result
|
||||
// may be empty; StatusCode is the proxy-side error code.
|
||||
TaskStatusFailed TaskStatus = "failed"
|
||||
)
|
||||
|
||||
// TaskResult is the buffered result of a completed A2A task. Mirrors
|
||||
// the shape proxyA2ARequest would have returned to the canvas in the
|
||||
// synchronous path, so a GET can synthesize the same response body the
|
||||
// canvas would have parsed if it had held the connection.
|
||||
type TaskResult struct {
|
||||
// Status is the HTTP status code the canvas would have seen:
|
||||
// 200 for a successful A2A dispatch, 4xx/5xx for an error.
|
||||
// Note: NOT 202 — 202 was the queued ack, not the actual result.
|
||||
Status int
|
||||
// Body is the raw response body (JSON-RPC envelope for a
|
||||
// message/send success, or {"error": "..."} envelope for a
|
||||
// dispatch failure). Empty string for a TaskStatusFailed that
|
||||
// lost the body to a transport error.
|
||||
Body []byte
|
||||
// ContentType is the response Content-Type. Almost always
|
||||
// "application/json" (the proxy's default) but preserved so a
|
||||
// future extension that streams binary parts doesn't have to
|
||||
// refactor the store.
|
||||
ContentType string
|
||||
}
|
||||
|
||||
// TaskHandle is the in-memory record for a single cap-and-queue
|
||||
// async-dispatch. Created by NewTaskHandle (called from ProxyA2A
|
||||
// when the budget timer fires), completed by the detached goroutine
|
||||
// when proxyA2ARequest returns, retrievable by task_id via the
|
||||
// GET endpoint for canvas-side late-arrival recovery.
|
||||
type TaskHandle struct {
|
||||
// ID is the unique correlation key the canvas uses to
|
||||
// reconcile the eventual WS push OR poll this store. UUID
|
||||
// v4 so a caller can't enumerate neighboring task_ids.
|
||||
ID string
|
||||
|
||||
// WorkspaceID is the workspace the task was raised against.
|
||||
// The GET endpoint authz-checks against this — a caller
|
||||
// from workspace A cannot poll a task_id created by
|
||||
// workspace B (the cross-workspace approval-gate class
|
||||
// from #2574 / #2593, mirrored here for consistency).
|
||||
WorkspaceID string
|
||||
|
||||
// Method is the A2A method name (e.g. "message/send") the
|
||||
// task was created for. Preserved so a future generic
|
||||
// store (other A2A methods) doesn't lose the dispatch
|
||||
// semantics.
|
||||
Method string
|
||||
|
||||
// CreatedAt is when the handle was created (== the moment
|
||||
// the cap-and-queue budget fired). The janitor uses this
|
||||
// to compute age for the TTL eviction.
|
||||
CreatedAt time.Time
|
||||
|
||||
// mu guards the mutable state below. RWMutex so a burst
|
||||
// of GETs (the late-arrival polling path) doesn't serialize
|
||||
// against each other.
|
||||
mu sync.RWMutex
|
||||
// status is the current lifecycle state. Read-mostly
|
||||
// after Complete fires, so RWMutex is the right shape.
|
||||
status TaskStatus
|
||||
// result is the buffered reply, populated by Complete.
|
||||
// nil until status moves out of pending.
|
||||
result *TaskResult
|
||||
}
|
||||
|
||||
// Status returns the current lifecycle state. Safe for concurrent
|
||||
// reads — takes the read lock so a concurrent Complete can't tear
|
||||
// the read.
|
||||
func (t *TaskHandle) Status() TaskStatus {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.status
|
||||
}
|
||||
|
||||
// Result returns the buffered reply + a bool indicating whether
|
||||
// the task is in a terminal state. The bool is true once status
|
||||
// has moved to completed/failed; before that, result is nil and
|
||||
// the bool is false (caller should keep polling).
|
||||
//
|
||||
// Returned by-value (not by-pointer) so a caller can't observe a
|
||||
// later mutation without re-reading. The Result pointer inside
|
||||
// the handle is replaced atomically by Complete, so even a caller
|
||||
// that captured a pointer would see the old value if Complete
|
||||
// fires after; by-value copies the *pointer*, which is a known
|
||||
// limitation — but the *content* is immutable post-Complete (the
|
||||
// TaskResult struct itself isn't mutated after assignment), so
|
||||
// the captured pointer is safe to read.
|
||||
func (t *TaskHandle) Result() (TaskResult, bool) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
if t.status == TaskStatusPending || t.result == nil {
|
||||
return TaskResult{}, false
|
||||
}
|
||||
return *t.result, true
|
||||
}
|
||||
|
||||
// Complete moves the handle to a terminal state, buffering the
|
||||
// result. Idempotent: a second call after a successful first one
|
||||
// returns ErrTaskAlreadyCompleted (does NOT overwrite the result).
|
||||
// This is the contract pin for the late-arrival race: a WS push
|
||||
// that fires AFTER a poll already recovered the result is a no-op
|
||||
// at the store layer; the canvas side then sees the same body
|
||||
// either way and the only cost is one wasted broadcast.
|
||||
//
|
||||
// The CAS pattern (read status, write status if pending) is
|
||||
// correct under concurrent Complete calls: exactly one caller
|
||||
// wins, the other gets ErrTaskAlreadyCompleted. The first
|
||||
// winner's result is what survives.
|
||||
func (t *TaskHandle) Complete(status TaskStatus, result TaskResult) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.status != TaskStatusPending {
|
||||
return ErrTaskAlreadyCompleted
|
||||
}
|
||||
t.status = status
|
||||
// Defensive copy so a caller mutating their local TaskResult
|
||||
// after Complete returns can't change the buffered result.
|
||||
copied := result
|
||||
if result.Body != nil {
|
||||
copied.Body = append([]byte(nil), result.Body...)
|
||||
}
|
||||
t.result = &copied
|
||||
return nil
|
||||
}
|
||||
|
||||
// A2ATaskStore is the package-level in-memory store. Singleton
|
||||
// per process; see file header for the multi-tenant refactor
|
||||
// note. RWMutex so the GET-side polling path (which can fire
|
||||
// every 5s per pending task) doesn't serialize against the
|
||||
// Create/Complete paths.
|
||||
type A2ATaskStore struct {
|
||||
mu sync.RWMutex
|
||||
tasks map[string]*TaskHandle
|
||||
janitor *time.Ticker
|
||||
stop chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// a2aTaskStoreTTL is how long a TaskHandle survives after
|
||||
// creation before the janitor evicts it. 5min is the canvas
|
||||
// recovery window per the design (#2818): the WS push is
|
||||
// best-effort, the poll fallback is 5s, so a canvas that misses
|
||||
// both the push AND the first poll can still recover via
|
||||
// repeated polls for up to 5min before the store evicts the
|
||||
// handle. After that, the agent reply is lost to the canvas
|
||||
// (still in the activity_logs / events history for the agent
|
||||
// owner, just not surfaceable via the task_id correlation key).
|
||||
//
|
||||
// 5min is NOT the agent turn timeout — a 30min agent turn can
|
||||
// still be in flight at the 5min mark, and the result is then
|
||||
// delivered via WS only (the canvas's only hope of seeing it
|
||||
// is the push). The TTL is a recovery-window bound, not an
|
||||
// agent-runtime bound.
|
||||
const a2aTaskStoreTTL = 5 * time.Minute
|
||||
|
||||
// a2aTaskStoreJanitorInterval is how often the janitor scans
|
||||
// for expired tasks. 60s is a fine balance: fast enough that
|
||||
// the map doesn't grow unbounded under steady load, slow
|
||||
// enough that the scan isn't a hot path.
|
||||
const a2aTaskStoreJanitorInterval = 60 * time.Second
|
||||
|
||||
// a2aTaskStore is the package-level singleton. Initialized
|
||||
// lazily on first NewTaskHandle call so tests that don't
|
||||
// touch the store don't pay the janitor goroutine cost.
|
||||
var a2aTaskStore = &A2ATaskStore{
|
||||
tasks: make(map[string]*TaskHandle),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
// init starts the janitor on package import. The cost is one
|
||||
// goroutine per process; the benefit is that the store is
|
||||
// always ready when ProxyA2A first fires. Tests that don't
|
||||
// touch the store still pay the goroutine cost — acceptable
|
||||
// because the janitor only fires every 60s and does an O(N)
|
||||
// scan of a map that's almost always empty.
|
||||
func init() {
|
||||
a2aTaskStore.once.Do(func() {
|
||||
a2aTaskStore.janitor = time.NewTicker(a2aTaskStoreJanitorInterval)
|
||||
go a2aTaskStore.runJanitor()
|
||||
})
|
||||
}
|
||||
|
||||
// runJanitor is the periodic eviction loop. Runs forever until
|
||||
// process exit; no shutdown path because the process is
|
||||
// short-lived (the platform restarts on deploy, and the OS
|
||||
// reclaims the map on exit).
|
||||
func (s *A2ATaskStore) runJanitor() {
|
||||
for {
|
||||
select {
|
||||
case <-s.janitor.C:
|
||||
s.pruneExpired()
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pruneExpired evicts handles older than a2aTaskStoreTTL.
|
||||
// O(N) over the map; N is bounded by the peak concurrent
|
||||
// in-flight tasks (typically <100 even on a busy platform).
|
||||
// The take-and-release lock pattern (copy keys under read
|
||||
// lock, delete under write lock) is the standard Go map
|
||||
// iteration under lock idiom — delete-during-iteration
|
||||
// would invalidate the cursor.
|
||||
func (s *A2ATaskStore) pruneExpired() {
|
||||
cutoff := time.Now().Add(-a2aTaskStoreTTL)
|
||||
var expired []string
|
||||
s.mu.RLock()
|
||||
for id, t := range s.tasks {
|
||||
if t.CreatedAt.Before(cutoff) {
|
||||
expired = append(expired, id)
|
||||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
if len(expired) == 0 {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
for _, id := range expired {
|
||||
delete(s.tasks, id)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// NewTaskHandle creates a new pending TaskHandle, inserts it
|
||||
// into the store, and returns it. The caller (ProxyA2A's
|
||||
// cap-and-queue branch) embeds the returned ID in the 202
|
||||
// response so the canvas can correlate.
|
||||
func (s *A2ATaskStore) NewTaskHandle(workspaceID, method string) *TaskHandle {
|
||||
t := &TaskHandle{
|
||||
ID: uuid.NewString(),
|
||||
WorkspaceID: workspaceID,
|
||||
Method: method,
|
||||
CreatedAt: time.Now(),
|
||||
status: TaskStatusPending,
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.tasks[t.ID] = t
|
||||
s.mu.Unlock()
|
||||
return t
|
||||
}
|
||||
|
||||
// Get retrieves a TaskHandle by ID. Returns ErrTaskNotFound
|
||||
// for expired or never-existed handles. The authz check
|
||||
// (workspace_id match) is the caller's responsibility —
|
||||
// Get returns the handle regardless of workspace, the
|
||||
// handler then checks handle.WorkspaceID against the
|
||||
// caller's workspace.
|
||||
func (s *A2ATaskStore) Get(taskID string) (*TaskHandle, error) {
|
||||
s.mu.RLock()
|
||||
t, ok := s.tasks[taskID]
|
||||
s.mu.RUnlock()
|
||||
if !ok {
|
||||
return nil, ErrTaskNotFound
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
@@ -0,0 +1,315 @@
|
||||
package handlers
|
||||
|
||||
// a2a_task_store_test.go — unit tests for the in-memory task store
|
||||
// (core#2751 / #2818 async-dispatch contract). The store is the
|
||||
// durable buffer for the cap-and-queue's late-arrival path: a
|
||||
// canvas that misses the WS push polls the store via the
|
||||
// GET /workspaces/:id/a2a/task/{task_id} endpoint to recover
|
||||
// the buffered result. Tests pin:
|
||||
// - Lifecycle: NewTaskHandle → Complete → Get round-trip
|
||||
// - Concurrency: parallel Complete calls (only one wins — the
|
||||
// first terminal is the durable result)
|
||||
// - Late-arrival race: Get after Complete returns the buffered
|
||||
// result; a second Complete is a no-op
|
||||
// - TTL eviction: handles older than a2aTaskStoreTTL are pruned
|
||||
// - Error paths: ErrTaskNotFound for expired/missing,
|
||||
// ErrTaskAlreadyCompleted for double-Complete
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestTaskStore_NewPendingHandle_GeneratesUniqueIDs pins the
|
||||
// contract that every NewTaskHandle returns a fresh UUID v4
|
||||
// (no collisions even under burst). Two handles from the same
|
||||
// store must have different IDs.
|
||||
func TestTaskStore_NewPendingHandle_GeneratesUniqueIDs(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
h1 := s.NewTaskHandle("ws-1", "message/send")
|
||||
h2 := s.NewTaskHandle("ws-1", "message/send")
|
||||
if h1.ID == h2.ID {
|
||||
t.Errorf("expected unique IDs, got %q twice", h1.ID)
|
||||
}
|
||||
if h1.Status() != TaskStatusPending {
|
||||
t.Errorf("new handle should be pending, got %q", h1.Status())
|
||||
}
|
||||
if h1.WorkspaceID != "ws-1" {
|
||||
t.Errorf("WorkspaceID = %q, want ws-1", h1.WorkspaceID)
|
||||
}
|
||||
if h1.Method != "message/send" {
|
||||
t.Errorf("Method = %q, want message/send", h1.Method)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Complete_StoresResult — happy path: complete
|
||||
// moves the handle out of pending, Result returns the buffered
|
||||
// shape.
|
||||
func TestTaskStore_Complete_StoresResult(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
h := s.NewTaskHandle("ws-1", "message/send")
|
||||
body := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hi"}}`)
|
||||
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{
|
||||
Status: 200,
|
||||
Body: body,
|
||||
ContentType: "application/json",
|
||||
}); err != nil {
|
||||
t.Fatalf("first Complete: %v", err)
|
||||
}
|
||||
if got := h.Status(); got != TaskStatusCompleted {
|
||||
t.Errorf("Status after Complete = %q, want completed", got)
|
||||
}
|
||||
result, ok := h.Result()
|
||||
if !ok {
|
||||
t.Fatal("Result should be available after Complete")
|
||||
}
|
||||
if result.Status != 200 {
|
||||
t.Errorf("Result.Status = %d, want 200", result.Status)
|
||||
}
|
||||
if !bytes.Equal(result.Body, body) {
|
||||
t.Errorf("Result.Body = %q, want %q", result.Body, body)
|
||||
}
|
||||
if result.ContentType != "application/json" {
|
||||
t.Errorf("Result.ContentType = %q, want application/json", result.ContentType)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Complete_Idempotent — the load-bearing
|
||||
// contract pin. A second Complete on an already-terminal
|
||||
// handle returns ErrTaskAlreadyCompleted AND does NOT
|
||||
// overwrite the buffered result. This is what protects
|
||||
// the late-arrival race: a WS push that fires after a
|
||||
// poll already recovered the result is a no-op at the
|
||||
// store layer; the first Complete wins.
|
||||
func TestTaskStore_Complete_Idempotent(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
h := s.NewTaskHandle("ws-1", "message/send")
|
||||
|
||||
first := []byte(`{"result":"first"}`)
|
||||
second := []byte(`{"result":"second"}`)
|
||||
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: first}); err != nil {
|
||||
t.Fatalf("first Complete: %v", err)
|
||||
}
|
||||
err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: second})
|
||||
if err != ErrTaskAlreadyCompleted {
|
||||
t.Errorf("second Complete: want ErrTaskAlreadyCompleted, got %v", err)
|
||||
}
|
||||
// First result survives.
|
||||
result, ok := h.Result()
|
||||
if !ok {
|
||||
t.Fatal("Result should be available after first Complete")
|
||||
}
|
||||
if !bytes.Equal(result.Body, first) {
|
||||
t.Errorf("second Complete overwrote result: got %q, want %q", result.Body, first)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Complete_ConcurrentFirstWins — the concurrent
|
||||
// version of the idempotency contract. Multiple goroutines
|
||||
// race to Complete the same handle; exactly one wins, the
|
||||
// others get ErrTaskAlreadyCompleted, and the surviving
|
||||
// result is the one from the winning goroutine.
|
||||
func TestTaskStore_Complete_ConcurrentFirstWins(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
h := s.NewTaskHandle("ws-1", "message/send")
|
||||
|
||||
const N = 50
|
||||
var winners atomic.Int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(N)
|
||||
for i := 0; i < N; i++ {
|
||||
i := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
body := []byte(`{"id":` + itoaForTest(i) + `}`)
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: body}); err == nil {
|
||||
winners.Add(1)
|
||||
} else if err != ErrTaskAlreadyCompleted {
|
||||
t.Errorf("goroutine %d: unexpected error %v", i, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if got := winners.Load(); got != 1 {
|
||||
t.Errorf("expected exactly 1 winner, got %d", got)
|
||||
}
|
||||
// Result is the winning goroutine's body — we don't know
|
||||
// which one, but it must be non-nil and parseable as the
|
||||
// shape Complete wrote.
|
||||
result, ok := h.Result()
|
||||
if !ok {
|
||||
t.Fatal("Result should be available after concurrent Complete")
|
||||
}
|
||||
if len(result.Body) == 0 {
|
||||
t.Error("winning Complete produced empty body")
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Get_UnknownTaskID — the basic error path.
|
||||
// Get for a task_id that never existed returns
|
||||
// ErrTaskNotFound.
|
||||
func TestTaskStore_Get_UnknownTaskID(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
if _, err := s.Get("00000000-0000-0000-0000-000000000000"); err != ErrTaskNotFound {
|
||||
t.Errorf("Get unknown: want ErrTaskNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Prune_RemovesExpired — the TTL eviction
|
||||
// contract. A handle older than a2aTaskStoreTTL is removed
|
||||
// from the store on the next pruneExpired call. Test
|
||||
// bypasses the real timer by calling pruneExpired
|
||||
// directly with a synthetically-aged handle.
|
||||
func TestTaskStore_Prune_RemovesExpired(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
h := s.NewTaskHandle("ws-1", "message/send")
|
||||
|
||||
// Backdate the handle past the TTL.
|
||||
h.CreatedAt = time.Now().Add(-2 * a2aTaskStoreTTL)
|
||||
|
||||
s.pruneExpired()
|
||||
|
||||
if _, err := s.Get(h.ID); err != ErrTaskNotFound {
|
||||
t.Errorf("expected expired handle to be pruned, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_LateArrivalRace — the contract-critical
|
||||
// integration test for #2818. The full flow:
|
||||
// 1. cap-and-queue creates the handle, returns task_id
|
||||
// 2. agent is slow; canvas polls via the GET endpoint
|
||||
// while the handle is still pending → 202
|
||||
// 3. agent replies; goroutine Completes the handle
|
||||
// 4. canvas polls again → 200 with the buffered result
|
||||
// 5. WS push fires after the poll already recovered
|
||||
// the result; the late-arrival Complete is a no-op
|
||||
// (ErrTaskAlreadyCompleted)
|
||||
func TestTaskStore_LateArrivalRace(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
h := s.NewTaskHandle("ws-1", "message/send")
|
||||
|
||||
// Step 2: poll while pending.
|
||||
if h.Status() != TaskStatusPending {
|
||||
t.Fatalf("expected pending, got %q", h.Status())
|
||||
}
|
||||
if _, ok := h.Result(); ok {
|
||||
t.Fatal("Result should not be available before Complete")
|
||||
}
|
||||
|
||||
// Step 3: agent replies.
|
||||
reply := []byte(`{"jsonrpc":"2.0","id":"req-1","result":{"reply":"hello"}}`)
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: reply}); err != nil {
|
||||
t.Fatalf("Complete: %v", err)
|
||||
}
|
||||
|
||||
// Step 4: poll after Complete returns the buffered result.
|
||||
result, ok := h.Result()
|
||||
if !ok {
|
||||
t.Fatal("Result should be available after Complete")
|
||||
}
|
||||
if !bytes.Equal(result.Body, reply) {
|
||||
t.Errorf("Result.Body = %q, want %q", result.Body, reply)
|
||||
}
|
||||
|
||||
// Step 5: a late-arrival Complete (e.g. WS push firing
|
||||
// after the canvas's poll already recovered) is a
|
||||
// no-op — the second call returns ErrTaskAlreadyCompleted
|
||||
// and does NOT overwrite the first result.
|
||||
if err := h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: []byte(`{"late":"winner"}`)}); err != ErrTaskAlreadyCompleted {
|
||||
t.Errorf("late-arrival Complete: want ErrTaskAlreadyCompleted, got %v", err)
|
||||
}
|
||||
result2, _ := h.Result()
|
||||
if !bytes.Equal(result2.Body, reply) {
|
||||
t.Errorf("late-arrival Complete overwrote result: got %q, want %q", result2.Body, reply)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_ConcurrentAccess — many goroutines
|
||||
// creating + completing + getting handles, no data race.
|
||||
// Run with `go test -race` to catch the race detector.
|
||||
func TestTaskStore_ConcurrentAccess(t *testing.T) {
|
||||
s := newIsolatedTaskStore(t)
|
||||
|
||||
const N = 100
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
h := s.NewTaskHandle("ws-1", "message/send")
|
||||
_ = h.Complete(TaskStatusCompleted, TaskResult{Status: 200, Body: []byte(`ok`)})
|
||||
_, _ = s.Get(h.ID)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestExtractA2AMethod pins the lightweight JSON peek used
|
||||
// by the cap-and-queue branch to record which A2A method
|
||||
// raised the task. Tolerant of malformed input (returns
|
||||
// "" on parse error so the task_id is still issued).
|
||||
func TestExtractA2AMethod(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
body string
|
||||
want string
|
||||
}{
|
||||
{"message/send", `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{}}`, "message/send"},
|
||||
{"message/stream", `{"method":"message/stream"}`, "message/stream"},
|
||||
{"initialize", `{"method":"initialize"}`, "initialize"},
|
||||
{"missing-method", `{"jsonrpc":"2.0","id":"1","params":{}}`, ""},
|
||||
{"non-string-method", `{"method":42}`, ""},
|
||||
{"malformed", `{"method":`, ""},
|
||||
{"empty", ``, ""},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
if got := extractA2AMethod([]byte(c.body)); got != c.want {
|
||||
t.Errorf("extractA2AMethod(%q) = %q, want %q", c.body, got, c.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// newIsolatedTaskStore creates a fresh A2ATaskStore for the
|
||||
// test, replacing the package-level singleton. Avoids the
|
||||
// real janitor goroutine and the shared-state bleed that
|
||||
// would happen if two tests used the same store.
|
||||
func newIsolatedTaskStore(t *testing.T) *A2ATaskStore {
|
||||
t.Helper()
|
||||
return &A2ATaskStore{
|
||||
tasks: make(map[string]*TaskHandle),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// itoa is a tiny no-allocation integer-to-string for the
|
||||
// concurrent test (faster than strconv.Itoa in a hot loop).
|
||||
// Defined in a way that doesn't collide with class1_ast_gate_test.go
|
||||
// (which has its own itoa) by using a per-test wrapper.
|
||||
func itoaForTest(i int) string {
|
||||
if i == 0 {
|
||||
return "0"
|
||||
}
|
||||
neg := i < 0
|
||||
if neg {
|
||||
i = -i
|
||||
}
|
||||
var buf [20]byte
|
||||
pos := len(buf)
|
||||
for i > 0 {
|
||||
pos--
|
||||
buf[pos] = byte('0' + i%10)
|
||||
i /= 10
|
||||
}
|
||||
if neg {
|
||||
pos--
|
||||
buf[pos] = '-'
|
||||
}
|
||||
return string(buf[pos:])
|
||||
}
|
||||
@@ -250,6 +250,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// org-level token). Existence-non-inferring 404 on auth failure.
|
||||
r.GET("/workspaces/:id/a2a/queue/:queue_id", wh.GetA2AQueueStatus)
|
||||
|
||||
// A2A task lookup (#2818) — the late-arrival recovery path for
|
||||
// the cap-and-queue async-dispatch contract. Canvas polls this
|
||||
// every 5s when the WS push is missed, looking for the buffered
|
||||
// result. Same routing rationale as /a2a/queue/:queue_id (auth
|
||||
// is per-handler against the task's WorkspaceID).
|
||||
r.GET("/workspaces/:id/a2a/task/:taskId", wh.GetA2ATask)
|
||||
|
||||
// Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a.
|
||||
// Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13
|
||||
// by requiring a valid bearer token for any workspace that has one on file.
|
||||
|
||||
Reference in New Issue
Block a user