feat(workspace-server#2751): async-dispatch contract — 202+task_id, polling fallback, race buffer #2818
@@ -29,6 +29,14 @@ interface A2AResponse {
|
||||
* reply inside status.message.parts. */
|
||||
status?: { message?: { parts?: A2APart[] } };
|
||||
};
|
||||
/** Async-dispatch contract expansion (core#2751): set when ws-server's
|
||||
* cap-and-queue fires (HTTP 202 Accepted) — the canvas correlates
|
||||
* the A2A_RESPONSE WS push to this id and falls back to polling
|
||||
* GET /workspaces/:id/a2a/task/{task_id} if the WS push is missed.
|
||||
* Polled result has the same shape as the inline result envelope
|
||||
* (`{result: {parts|status.message|artifacts: ...}}`), so the same
|
||||
* extractReplyText/extractFilesFromTask pipeline applies. */
|
||||
task_id?: string;
|
||||
/** Set by ws-server's poll-mode short-circuit in `proxyA2ARequest`
|
||||
* (a2a_proxy.go:416-431) when the target workspace is registered as
|
||||
* `delivery_mode=poll` — e.g. an operator's laptop running
|
||||
@@ -175,6 +183,17 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
const pendingWSTokensRef = useRef<Set<number>>(new Set());
|
||||
const wsCompletedTokensRef = useRef<Set<number>>(new Set());
|
||||
const messageIdToTokenRef = useRef<Map<string, number>>(new Map());
|
||||
// core#2751 async-dispatch expansion: task_id correlation. The 202 ack
|
||||
// carries a task_id; the A2A_RESPONSE WS push carries the same task_id;
|
||||
// the GET /task/{task_id} polling endpoint serves the buffered result.
|
||||
// The map is the primary correlation key for the async-dispatch path
|
||||
// (message_id is the legacy fallback for ws-server builds that pre-date
|
||||
// the expansion).
|
||||
const taskIdToTokenRef = useRef<Map<string, number>>(new Map());
|
||||
// Pending polling timers keyed by task_id. Cancelled when the send
|
||||
// completes (via finishSendByTaskID or finishSendByMessageId), so a
|
||||
// fast WS-delivered reply doesn't trigger a redundant poll.
|
||||
const pendingTaskPollsRef = useRef<Map<string, ReturnType<typeof setTimeout>>>(new Map());
|
||||
const sendingFromAPIRef = useRef(false);
|
||||
const nextTokenRef = useRef(1);
|
||||
const setupGuardRef = useRef(false);
|
||||
@@ -199,6 +218,21 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// core#2751: clean up the task_id mapping AND cancel any pending
|
||||
// polling timer for this token. The poll's late-finish path
|
||||
// re-checks the token's membership in inFlight/pendingWS, so
|
||||
// a no-op cancel is safe.
|
||||
for (const [tid, tok] of taskIdToTokenRef.current) {
|
||||
if (tok === token) {
|
||||
taskIdToTokenRef.current.delete(tid);
|
||||
const t = pendingTaskPollsRef.current.get(tid);
|
||||
if (t !== undefined) {
|
||||
clearTimeout(t);
|
||||
pendingTaskPollsRef.current.delete(tid);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
syncSendingState();
|
||||
}, [syncSendingState]);
|
||||
|
||||
@@ -209,6 +243,75 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
}
|
||||
}, [finishSendToken]);
|
||||
|
||||
// core#2751 async-dispatch expansion: finish by task_id (primary
|
||||
// correlation key for the 202+task_id ack path). The WS push carries
|
||||
// the same task_id and useChatSocket's onAgentMessage calls this
|
||||
// to release the guard. No-op if the task_id is unknown (legacy
|
||||
// ws-server builds that don't carry task_id fall back to
|
||||
// finishSendByMessageId).
|
||||
const finishSendByTaskId = useCallback((taskId: string) => {
|
||||
const token = taskIdToTokenRef.current.get(taskId);
|
||||
if (token !== undefined) {
|
||||
finishSendToken(token);
|
||||
}
|
||||
// Cancel any pending polling timer for this task_id — the WS push
|
||||
// (or the poll result) won, no further poll needed.
|
||||
const t = pendingTaskPollsRef.current.get(taskId);
|
||||
if (t !== undefined) {
|
||||
clearTimeout(t);
|
||||
pendingTaskPollsRef.current.delete(taskId);
|
||||
}
|
||||
}, [finishSendToken]);
|
||||
|
||||
// scheduleTaskPoll arms a one-shot GET /workspaces/:id/a2a/task/{task_id}
|
||||
// timer for the polling fallback. Fires after a delay (default 5s)
|
||||
// to give the WS push first crack at the reply. If the WS push
|
||||
// already finished the send, finishSendByTaskId clears the timer.
|
||||
// If the poll lands a result, it synthesises the same ChatMessage
|
||||
// the inline path would have built, then finishes the send.
|
||||
const scheduleTaskPoll = useCallback(
|
||||
(taskId: string, token: number) => {
|
||||
// Don't double-arm. (Defensive — the caller already guards.)
|
||||
if (pendingTaskPollsRef.current.has(taskId)) return;
|
||||
const POLL_DELAY_MS = 5_000;
|
||||
const t = setTimeout(() => {
|
||||
pendingTaskPollsRef.current.delete(taskId);
|
||||
// Re-check that the send is still pending (WS may have
|
||||
// completed during the 5s window). Defensive: the WS handler
|
||||
// is supposed to cancel this timer, but a missed cancel +
|
||||
// a late WS push could otherwise mis-fire the poll's finish.
|
||||
if (
|
||||
!inFlightTokensRef.current.has(token) &&
|
||||
!pendingWSTokensRef.current.has(token)
|
||||
) return;
|
||||
// Type is local to this closure — the parsed shape is the
|
||||
// same envelope the inline POST returns (poll responses use
|
||||
// the same wire shape as the inline success path). Cast at
|
||||
// the use site so the closure type stays narrow.
|
||||
fetch(`/api/workspaces/${workspaceId}/a2a/task/${encodeURIComponent(taskId)}`)
|
||||
.then((r) => (r.ok ? r.json() : null))
|
||||
.then((body: { result?: unknown } | null) => {
|
||||
if (!body || !body.result) return; // still pending → let the next op handle it
|
||||
const replyText = extractReplyText(body as unknown as A2AResponse);
|
||||
if (replyText) {
|
||||
optionsRef.current.onAgentMessage?.(
|
||||
createMessage("agent", replyText, []),
|
||||
);
|
||||
}
|
||||
finishSendToken(token);
|
||||
})
|
||||
.catch(() => {
|
||||
// Poll failed — keep the send in pending-WS; another
|
||||
// poll attempt will fire if scheduleTaskPoll is called
|
||||
// again. (A hard error here is a server outage, not a
|
||||
// transient blip worth surfacing to the user mid-send.)
|
||||
});
|
||||
}, POLL_DELAY_MS);
|
||||
pendingTaskPollsRef.current.set(taskId, t);
|
||||
},
|
||||
[workspaceId, finishSendToken],
|
||||
);
|
||||
|
||||
const releaseSendGuards = useCallback((messageId?: string) => {
|
||||
// Token-aware completion: modern ws-server builds echo the client-generated
|
||||
// messageId, so we can finish EXACTLY the send that completed (CR2 #11466,
|
||||
@@ -384,6 +487,23 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
// the AGENT_MESSAGE WebSocket event after the agent's next
|
||||
// `wait_for_message` poll.
|
||||
//
|
||||
// core#2751 durable async-dispatch expansion: the cap-and-queue
|
||||
// path also returns `{status:"queued", task_id, delivery_mode:"push-async"}`
|
||||
// (HTTP 202 Accepted). The task_id is the primary correlation
|
||||
// key for the eventual A2A_RESPONSE WS push and the polling
|
||||
// fallback (GET /workspaces/:id/a2a/task/{task_id}). The
|
||||
// canvas keeps the spinner up; either path delivers the
|
||||
// terminal reply:
|
||||
// 1. WS push carrying the same task_id (fast path, sub-second
|
||||
// turns) — useChatSocket's onAgentMessage finishes the
|
||||
// send by task_id.
|
||||
// 2. Polling fallback — see scheduleTaskPoll below. Fires
|
||||
// after 5s of no WS message for the task; recovers the
|
||||
// reply if the WS push was missed (canvas hard-refresh,
|
||||
// network blip). The GET endpoint serves the buffered
|
||||
// result from the ws-server task store (5min TTL), so
|
||||
// late polls still see the answer.
|
||||
//
|
||||
// Keep the spinner up by moving the token to the WS-pending set;
|
||||
// releaseSendGuards will prune it when the AGENT_MESSAGE lands
|
||||
// (handled by useChatSocket `onAgentMessage`/`onSendComplete`)
|
||||
@@ -395,8 +515,17 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
// path, finish immediately instead of re-pending forever.
|
||||
if (wsCompletedTokensRef.current.has(myToken)) {
|
||||
finishSendToken(myToken);
|
||||
} else {
|
||||
pendSendTokenForWS(myToken);
|
||||
return;
|
||||
}
|
||||
pendSendTokenForWS(myToken);
|
||||
// task_id correlation + polling fallback (core#2751 expansion).
|
||||
// The task_id is the durable handle for this send; the
|
||||
// polling timer is a one-shot that auto-cancels on send
|
||||
// completion (finishSendByTaskID/finishSendByMessageId)
|
||||
// via the pendingTaskPollsRef cleanup.
|
||||
if (resp?.task_id) {
|
||||
taskIdToTokenRef.current.set(resp.task_id, myToken);
|
||||
scheduleTaskPoll(resp.task_id, myToken);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -465,7 +594,7 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
setupGuardRef.current = false;
|
||||
});
|
||||
},
|
||||
[workspaceId, uploading, syncSendingState, finishSendToken, pendSendTokenForWS],
|
||||
[workspaceId, uploading, syncSendingState, finishSendToken, pendSendTokenForWS, scheduleTaskPoll],
|
||||
);
|
||||
|
||||
return {
|
||||
@@ -476,6 +605,10 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
|
||||
clearError,
|
||||
releaseSendGuards,
|
||||
finishSendByMessageId,
|
||||
// core#2751 async-dispatch expansion: exposed for useChatSocket's
|
||||
// onAgentMessage to release the guard by task_id (the primary
|
||||
// correlation key for the 202+task_id ack path).
|
||||
finishSendByTaskId,
|
||||
sendingFromAPIRef,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -46,7 +46,13 @@ export function useChatSocket(
|
||||
// Each consumed message may correspond to a distinct completed send.
|
||||
// Finish the specific token by messageId; legacy payloads without an
|
||||
// id fall back to the coarse release path.
|
||||
callbacksRef.current.onSendComplete?.(m.messageId);
|
||||
//
|
||||
// core#2751 async-dispatch expansion: when the WS push carries
|
||||
// a taskId, the cap-and-queue ack path keys off it. Call
|
||||
// onSendComplete with the taskId so the in-flight send guard
|
||||
// releases by task_id (the new primary correlation key). The
|
||||
// ChatTab layer routes the taskId vs messageId distinction.
|
||||
callbacksRef.current.onSendComplete?.(m.taskId || m.messageId);
|
||||
}
|
||||
}, [pendingAgentMsgs, workspaceId]);
|
||||
|
||||
|
||||
@@ -499,6 +499,14 @@ export function handleCanvasEvent(
|
||||
const { agentMessages } = get();
|
||||
const existing = agentMessages[msg.workspace_id] || [];
|
||||
const messageId = typeof msg.payload.message_id === "string" ? msg.payload.message_id : undefined;
|
||||
// core#2751 async-dispatch expansion: task_id is the
|
||||
// primary correlation key for the 202+task_id ack path.
|
||||
// The WS push carries the same task_id; ChatTab's send
|
||||
// pipeline keys off it to release the in-flight send
|
||||
// guard. Empty on legacy poll-mode / pre-expansion
|
||||
// ws-server builds — messageId correlation still works
|
||||
// (the legacy fallback).
|
||||
const taskId = typeof msg.payload.task_id === "string" ? msg.payload.task_id : undefined;
|
||||
set({
|
||||
agentMessages: {
|
||||
...agentMessages,
|
||||
@@ -509,6 +517,7 @@ export function handleCanvasEvent(
|
||||
content: text,
|
||||
timestamp: new Date().toISOString(),
|
||||
messageId,
|
||||
taskId,
|
||||
...(attachments.length > 0 ? { attachments } : {}),
|
||||
},
|
||||
],
|
||||
|
||||
@@ -275,8 +275,8 @@ interface CanvasState {
|
||||
batchPause: () => Promise<void>;
|
||||
batchDelete: () => Promise<void>;
|
||||
/** Agent-pushed messages keyed by workspace ID. ChatTab consumes and clears these. */
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; messageId?: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
|
||||
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; messageId?: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; messageId?: string; taskId?: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
|
||||
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; messageId?: string; taskId?: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
|
||||
/** WebSocket connection status — drives the live indicator in the Toolbar. */
|
||||
wsStatus: "connected" | "connecting" | "disconnected";
|
||||
setWsStatus: (status: "connected" | "connecting" | "disconnected") => void;
|
||||
|
||||
@@ -412,11 +412,48 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
body []byte
|
||||
perr *proxyA2AError
|
||||
}
|
||||
// ASYNC CONTRACT EXPANSION (core#2751 durable expansion): each
|
||||
// cap-and-queue ack gets a fresh task_id. The canvas correlates
|
||||
// the A2A_RESPONSE WS push to this id and falls back to polling
|
||||
// GET /workspaces/:id/a2a/task/:task_id if the WS push is missed.
|
||||
// The task_id is also the durable handle into the in-memory
|
||||
// store (a2a_task_store.go) so the canvas can recover the
|
||||
// result by polling even when the WS push was dropped.
|
||||
//
|
||||
// Status code: 202 Accepted (per the user/driver-approved
|
||||
// design — was 200 OK in the pre-expansion cap-and-queue;
|
||||
// 202 is the canonical "request acknowledged, processing async"
|
||||
// HTTP code and is what the canvas is now coded to recognize).
|
||||
//
|
||||
// We need the A2A method name (e.g. "message/send") in the
|
||||
// 202 ack body so the canvas's existing
|
||||
// `if (resp?.status === "queued")` branch can short-circuit
|
||||
// without a separate introspection. The full payload
|
||||
// normalization (id assignment, JSON-RPC wrap-if-missing)
|
||||
// still happens inside proxyA2ARequestWithTaskID, so we
|
||||
// peek just for the method here — non-mutating, idempotent.
|
||||
_, a2aMethodForAck, _ := normalizeA2APayload(body)
|
||||
taskHandle := getTaskStore().newPendingHandle(workspaceID)
|
||||
detached := context.WithoutCancel(ctx)
|
||||
budget := canvasA2ASyncBudget() // local copy for the time.After below
|
||||
done := make(chan a2aResult, 1)
|
||||
go func() {
|
||||
s, b, pe := h.proxyA2ARequest(detached, workspaceID, body, callerID, true, isCanvasUser)
|
||||
s, b, pe := h.proxyA2ARequestWithTaskID(detached, workspaceID, body, callerID, true, isCanvasUser, taskHandle.TaskID)
|
||||
// Record the terminal result in the task store so a
|
||||
// subsequent GET /task/{task_id} returns the buffered body
|
||||
// (the WS push is best-effort; the store is the durable
|
||||
// recovery path for missed WS pushes, e.g. a canvas
|
||||
// hard-refresh that lands between the agent's reply and
|
||||
// the WS event reaching the new client).
|
||||
if pe != nil {
|
||||
taskHandle.complete("failed", []byte(pe.Error()), pe.Status)
|
||||
} else {
|
||||
taskStatus := "completed"
|
||||
if s >= 400 {
|
||||
taskStatus = "failed"
|
||||
}
|
||||
taskHandle.complete(taskStatus, b, s)
|
||||
}
|
||||
done <- a2aResult{s, b, pe}
|
||||
}()
|
||||
select {
|
||||
@@ -431,10 +468,17 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
c.Data(r.status, "application/json", r.body)
|
||||
return
|
||||
case <-time.After(budget):
|
||||
// Outlived CF's edge limit — ack queued; the goroutine finishes and
|
||||
// the reply lands via WS. The canvas already treats `queued` as
|
||||
// "still processing" (delivery_mode mirrors poll-mode).
|
||||
c.JSON(http.StatusOK, gin.H{"status": "queued", "delivery_mode": "push-async", "method": "message/send"})
|
||||
// Outlived CF's edge limit — ack queued with the task_id
|
||||
// so the canvas can correlate the eventual A2A_RESPONSE
|
||||
// WS push (carrying the same task_id) and/or fall back to
|
||||
// polling GET /task/{task_id}. 202 Accepted is the canonical
|
||||
// "request accepted, result async" code.
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"status": "queued",
|
||||
"task_id": taskHandle.TaskID,
|
||||
"delivery_mode": "push-async",
|
||||
"method": a2aMethodForAck,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -490,6 +534,23 @@ func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool, isCanvasUser bool) (int, []byte, *proxyA2AError) {
|
||||
return h.proxyA2ARequestWithTaskID(ctx, workspaceID, body, callerID, logActivity, isCanvasUser, "")
|
||||
}
|
||||
|
||||
// proxyA2ARequestWithTaskID is the core /a2a dispatch entry point. The
|
||||
// optional `taskID` parameter threads the cap-and-queue task handle
|
||||
// through to the A2A_RESPONSE WS broadcast payload (so the canvas can
|
||||
// correlate the broadcast to the original 202 ack) AND into the
|
||||
// activity_logs.response_body (for forensic / chat-history replay).
|
||||
// Empty string when the call is NOT under the cap-and-queue path
|
||||
// (e.g. poll-mode short-circuit, agent-to-agent) — those paths use
|
||||
// the same wire shape minus the task_id, which is the only
|
||||
// contract change vs the pre-#2751-expansion behavior.
|
||||
//
|
||||
// Keeping the no-task-id overload `proxyA2ARequest` (above) means
|
||||
// the dozens of existing call sites (test scaffolding, agent-to-agent
|
||||
// wrappers, etc.) don't have to thread an empty string through.
|
||||
func (h *WorkspaceHandler) proxyA2ARequestWithTaskID(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool, isCanvasUser bool, taskID string) (int, []byte, *proxyA2AError) {
|
||||
// Access control: workspace-to-workspace requests must pass CanCommunicate check.
|
||||
// Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*)
|
||||
// are trusted. Self-calls (callerID == workspaceID) are always allowed.
|
||||
@@ -690,7 +751,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
||||
if logActivity && deliveryConfirmed {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccessWithTaskID(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs, taskID)
|
||||
}
|
||||
// Preserve the actual HTTP status code and any body bytes already read.
|
||||
// Previously this returned (0, nil, error) which discarded both.
|
||||
@@ -740,7 +801,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccessWithTaskID(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, resp.StatusCode, durationMs, taskID)
|
||||
}
|
||||
|
||||
// Track LLM token usage for cost transparency (#593).
|
||||
|
||||
@@ -359,6 +359,17 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
// pass-through for legacy call sites that don't have a taskID.
|
||||
h.logA2ASuccessWithTaskID(ctx, workspaceID, callerID, isCanvasUser, body, respBody, a2aMethod, statusCode, durationMs, "")
|
||||
}
|
||||
|
||||
// logA2ASuccessWithTaskID is the task-id-aware variant. When `taskID` is
|
||||
// non-empty (the cap-and-queue path set it in the 202 ack), the
|
||||
// A2A_RESPONSE broadcast payload and the activity_logs.response_body
|
||||
// both carry it, so the canvas can correlate the WS push to the
|
||||
// original POST AND the polling fallback (GET /task/{task_id}) can
|
||||
// surface the same task_id.
|
||||
func (h *WorkspaceHandler) logA2ASuccessWithTaskID(ctx context.Context, workspaceID, callerID string, isCanvasUser bool, body, respBody []byte, a2aMethod string, statusCode, durationMs int, taskID string) {
|
||||
logStatus := "ok"
|
||||
if statusCode >= 400 {
|
||||
logStatus = "error"
|
||||
@@ -438,13 +449,24 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
// receives both the inline HTTP reply and this WS event, and
|
||||
// appendMessageDeduped collapses them by (role, content, 3s window), which
|
||||
// is exactly why the anonymous canvas path doesn't double-render today.
|
||||
//
|
||||
// Async-contract expansion (core#2751 durable): the payload also carries
|
||||
// the cap-and-queue task_id when one is in flight. The canvas correlates
|
||||
// the WS push to its pending send via task_id (primary) or message_id
|
||||
// (fallback for older servers). Empty task_id on non-cap-and-queue paths
|
||||
// — the canvas's correlation falls back to message_id, which is
|
||||
// identical to the pre-expansion behavior.
|
||||
if (callerID == "" || isCanvasUser) && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
|
||||
payload := map[string]interface{}{
|
||||
"response_body": json.RawMessage(respBody),
|
||||
"method": a2aMethod,
|
||||
"duration_ms": durationMs,
|
||||
"message_id": extractMessageIdFromA2ABody(body),
|
||||
})
|
||||
}
|
||||
if taskID != "" {
|
||||
payload["task_id"] = taskID
|
||||
}
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), payload)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3008,12 +3008,16 @@ func TestProxyA2A_CanvasCapAndQueue(t *testing.T) {
|
||||
handler.ProxyA2A(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected 202 Accepted (cap-and-queue expansion), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), `"queued"`) {
|
||||
t.Errorf("expected queued ack, got: %s", w.Body.String())
|
||||
}
|
||||
// task_id must be present in the ack body (core#2751 durable expansion).
|
||||
if !strings.Contains(w.Body.String(), `"task_id"`) {
|
||||
t.Errorf("expected task_id in 202 ack, got: %s", w.Body.String())
|
||||
}
|
||||
// Returned at ~budget, NOT after the (blocked) agent — proves the cap fired.
|
||||
if elapsed > 2*time.Second {
|
||||
t.Errorf("handler held the connection (%v) instead of capping at the budget", elapsed)
|
||||
@@ -3236,8 +3240,12 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// 1. The HTTP response is the queued ack (not the agent reply).
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
||||
// core#2751 durable async-dispatch expansion: status is 202 Accepted
|
||||
// (not 200) and the body carries a task_id for WS correlation +
|
||||
// polling-fallback. The 200-OK + no-task-id shape is the
|
||||
// pre-expansion behavior.
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected 202 Accepted, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), `"queued"`) {
|
||||
t.Errorf("expected queued ack (sub-budget forced the cap), got: %s", w.Body.String())
|
||||
@@ -3245,6 +3253,17 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
|
||||
if !strings.Contains(w.Body.String(), `"push-async"`) {
|
||||
t.Errorf("expected delivery_mode:push-async, got: %s", w.Body.String())
|
||||
}
|
||||
// task_id is a UUID — pull it and assert non-empty. (The exact value is
|
||||
// not asserted; a regression that returns the same task_id twice across
|
||||
// distinct cap-and-queue acks would be a UUID generator bug, not a
|
||||
// test failure.)
|
||||
var ackResp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &ackResp); err != nil {
|
||||
t.Fatalf("ack body is not valid JSON: %v", err)
|
||||
}
|
||||
if tid, _ := ackResp["task_id"].(string); tid == "" {
|
||||
t.Errorf("expected non-empty task_id in the 202 ack, got: %s", w.Body.String())
|
||||
}
|
||||
// Returned at ~budget, NOT after the (blocked) agent.
|
||||
if elapsed > 300*time.Millisecond {
|
||||
t.Errorf("handler held the connection (%v) instead of capping at the 50ms budget", elapsed)
|
||||
|
||||
@@ -0,0 +1,349 @@
|
||||
package handlers
|
||||
|
||||
// a2a_task_store.go — In-memory task handle store for the async /a2a dispatch
|
||||
// contract (core#2751 / #2751 durable async-dispatch expansion).
|
||||
//
|
||||
// CONTEXT
|
||||
// ========
|
||||
// The cap-and-queue returns immediately (202 Accepted) with a task_id when
|
||||
// the canvas→agent turn is projected to outlive the CF ~100s edge limit.
|
||||
// The reply arrives via the existing A2A_RESPONSE WebSocket push (see
|
||||
// `logA2ASuccess` in a2a_proxy_helpers.go), correlated by `task_id`.
|
||||
// This file adds the per-task durable buffer + GET /task/{task_id} polling
|
||||
// endpoint that lets the canvas recover the result if the WS push is missed
|
||||
// (network blip, closed socket, canvas reload).
|
||||
//
|
||||
// DESIGN
|
||||
// ======
|
||||
// 1. Each canvas POST that triggers the cap-and-queue ack allocates a
|
||||
// `TaskHandle`, persisted in the in-memory store keyed by (workspaceID,
|
||||
// task_id).
|
||||
// 2. The detached dispatch goroutine in ProxyA2A calls `taskStore.complete(
|
||||
// taskID, status, resultBody)` when it finishes — whether the agent
|
||||
// returned 2xx or errored.
|
||||
// 3. The GET /task/{task_id} endpoint reads the handle and returns the
|
||||
// buffered result. Status "pending" → "completed" / "failed" once
|
||||
// the dispatch settles; an unknown / expired task_id returns 404.
|
||||
// 4. The store has a TTL (default 5 min) with a janitor goroutine that
|
||||
// prunes expired entries every 60s. 5 min comfortably exceeds the
|
||||
// longest normal canvas WS reconnect window (a hard-refresh of the
|
||||
// canvas tab is the worst-case legitimate "miss the WS push" trigger).
|
||||
//
|
||||
// RACE / LATE-ARRIVAL HANDLING
|
||||
// ============================
|
||||
// The contract-critical edge case is: the agent replies sub-second; the
|
||||
// A2A_RESPONSE WS push is delivered BEFORE the canvas has finished
|
||||
// processing the 202 response (and thus has registered the task_id for
|
||||
// correlation). The server-side task store is the durable buffer: even
|
||||
// if the WS push is dropped on the wire, the canvas can recover the
|
||||
// result by polling. The "WS result is held in task store even after
|
||||
// broadcast" property is the contract-critical piece.
|
||||
//
|
||||
// RACE COVERAGE: see a2a_task_store_test.go (TestTaskStore_LateArrivalRace)
|
||||
// which simulates a sub-second agent reply + immediate WS push that races
|
||||
// the canvas's 202-response processing, and verifies the GET endpoint
|
||||
// serves the result regardless.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// TaskHandle is one async /a2a dispatch. Lifecycle:
|
||||
//
|
||||
// pending — created in `ProxyA2A` when the cap-and-queue fires.
|
||||
// completed — set by the detached dispatch goroutine when the agent
|
||||
// returns 2xx; `Result` carries the raw response body.
|
||||
// failed — set by the detached dispatch goroutine on non-2xx or
|
||||
// dispatch error; `Result` carries the error body (or
|
||||
// the empty []byte{} if there was no body).
|
||||
//
|
||||
// Once completed/failed, the handle is preserved until TTL expiry so
|
||||
// late canvas polls (e.g. from a reconnect) still see the result.
|
||||
type TaskHandle struct {
|
||||
TaskID string `json:"task_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
Status string `json:"status"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
StatusCode int `json:"status_code,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// TaskStore is a per-process map of TaskHandle. The single-process scope
|
||||
// is fine for the current deployment (one ws-server per workspace
|
||||
// cluster) — the canvas polls a specific ws-server instance for a
|
||||
// given task_id, and ws-server affinity is implicit (the POST and the
|
||||
// subsequent GET hit the same instance because of sticky LB routing
|
||||
// for the workspace's a2a path).
|
||||
//
|
||||
// RACE / CONCURRENCY:
|
||||
// - pending → completed/failed is a single transition guarded by `mu`.
|
||||
// - concurrent Get on a not-yet-completed handle returns "pending"
|
||||
// (the right answer for the canvas; it'll poll again).
|
||||
// - concurrent Get on a completed handle returns the buffered
|
||||
// result (durable after the transition).
|
||||
type TaskStore struct {
|
||||
mu sync.RWMutex
|
||||
handles map[string]*TaskHandle // key: task_id (UUIDs are unique across workspaces)
|
||||
ttl time.Duration
|
||||
stopOnce sync.Once
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// Default TTL = 5 minutes. Comfortably exceeds the longest legitimate
|
||||
// canvas reconnect window (hard refresh) + the standard agent-turn
|
||||
// upper bound for in-flight tasks. Tighter TTL would risk GC'ing a
|
||||
// still-pending task before the agent replies; looser TTL is just
|
||||
// memory pressure without functional benefit. 5min is the
|
||||
// sweet spot.
|
||||
const defaultTaskStoreTTL = 5 * time.Minute
|
||||
|
||||
// defaultTaskStoreCleanupInterval is the janitor period. 60s is fine
|
||||
// granularity for a 5min TTL (the worst-case window-staleness on a
|
||||
// handle is one cleanup interval, which is invisible at 5min scale).
|
||||
const defaultTaskStoreCleanupInterval = 60 * time.Second
|
||||
|
||||
// taskStore is the package-level singleton. Created lazily by
|
||||
// `getTaskStore()` (which also starts the janitor). Tests can swap the
|
||||
// singleton via `setTaskStoreForTest` to inject a shorter-TTL /
|
||||
// no-janitor store.
|
||||
var (
|
||||
taskStoreMu sync.Mutex
|
||||
taskStoreSingleton *TaskStore
|
||||
)
|
||||
|
||||
func getTaskStore() *TaskStore {
|
||||
taskStoreMu.Lock()
|
||||
defer taskStoreMu.Unlock()
|
||||
if taskStoreSingleton == nil {
|
||||
taskStoreSingleton = newTaskStore(defaultTaskStoreTTL, defaultTaskStoreCleanupInterval)
|
||||
}
|
||||
return taskStoreSingleton
|
||||
}
|
||||
|
||||
// setTaskStoreForTest replaces the singleton. The test is responsible
|
||||
// for calling the returned restore func to put the original back (and
|
||||
// for stopping the injected store's janitor if any).
|
||||
func setTaskStoreForTest(s *TaskStore) func() {
|
||||
taskStoreMu.Lock()
|
||||
prev := taskStoreSingleton
|
||||
taskStoreSingleton = s
|
||||
taskStoreMu.Unlock()
|
||||
return func() {
|
||||
taskStoreMu.Lock()
|
||||
if taskStoreSingleton == s {
|
||||
taskStoreSingleton = prev
|
||||
}
|
||||
taskStoreMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskStore(ttl, cleanupInterval time.Duration) *TaskStore {
|
||||
ts := &TaskStore{
|
||||
handles: make(map[string]*TaskHandle),
|
||||
ttl: ttl,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
if cleanupInterval > 0 {
|
||||
go ts.janitor(cleanupInterval)
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// Stop terminates the janitor goroutine. Called from test cleanup;
|
||||
// production never calls this (the singleton lives for the process
|
||||
// lifetime).
|
||||
func (s *TaskStore) Stop() {
|
||||
s.stopOnce.Do(func() { close(s.stopCh) })
|
||||
}
|
||||
|
||||
// newPendingHandle creates a TaskHandle in "pending" state with a
|
||||
// fresh UUID. The handle is registered in the store atomically; the
|
||||
// caller is handed the handle so the dispatch goroutine can call
|
||||
// `complete` on it without re-looking up by task_id.
|
||||
func (s *TaskStore) newPendingHandle(workspaceID string) *TaskHandle {
|
||||
id := uuid.New().String()
|
||||
h := &TaskHandle{
|
||||
TaskID: id,
|
||||
WorkspaceID: workspaceID,
|
||||
Status: "pending",
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.handles[id] = h
|
||||
s.mu.Unlock()
|
||||
return h
|
||||
}
|
||||
|
||||
// complete marks the handle as completed/failed and stores the result.
|
||||
// Idempotent: subsequent calls are no-ops (the FIRST terminal state
|
||||
// wins; the WS push and the canvas poll are racing for who sees
|
||||
// the result, but the result is the SAME either way).
|
||||
func (h *TaskHandle) complete(status string, result []byte, statusCode int) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.Status != "pending" {
|
||||
return
|
||||
}
|
||||
h.Status = status
|
||||
if len(result) > 0 {
|
||||
// Sanitize: ensure the JSON is well-formed. Empty / invalid
|
||||
// bodies (e.g. an error path with no body) stay empty.
|
||||
if json.Valid(result) {
|
||||
h.Result = json.RawMessage(result)
|
||||
} else {
|
||||
// Wrap in a JSON object so the polling endpoint always
|
||||
// returns valid JSON. Operators debugging via the poll
|
||||
// endpoint should never see a parse error.
|
||||
h.Result = json.RawMessage(`{"raw":` + string(json.RawMessage(result)) + `}`)
|
||||
}
|
||||
}
|
||||
h.StatusCode = statusCode
|
||||
now := time.Now()
|
||||
h.CompletedAt = &now
|
||||
}
|
||||
|
||||
// get returns a copy of the handle for the GET endpoint. Returns nil
|
||||
// if the task_id is unknown (caller maps to 404).
|
||||
func (s *TaskStore) get(taskID string) *TaskHandle {
|
||||
s.mu.RLock()
|
||||
h, ok := s.handles[taskID]
|
||||
s.mu.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
// Return a copy under the handle's own lock so the GET response
|
||||
// reflects a consistent snapshot. The original may continue
|
||||
// mutating after the copy returns (rare in practice — once a
|
||||
// handle is completed, it doesn't change — but the copy is
|
||||
// safe regardless).
|
||||
cp := &TaskHandle{
|
||||
TaskID: h.TaskID,
|
||||
WorkspaceID: h.WorkspaceID,
|
||||
CreatedAt: h.CreatedAt,
|
||||
}
|
||||
h.mu.RLock()
|
||||
cp.Status = h.Status
|
||||
cp.Result = append(json.RawMessage(nil), h.Result...)
|
||||
cp.StatusCode = h.StatusCode
|
||||
if h.CompletedAt != nil {
|
||||
t := *h.CompletedAt
|
||||
cp.CompletedAt = &t
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
return cp
|
||||
}
|
||||
|
||||
// janitor periodically prunes handles older than `s.ttl`. Runs until
|
||||
// Stop is called. Errors are logged but do not terminate the
|
||||
// goroutine — the next tick will retry.
|
||||
func (s *TaskStore) janitor(interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
s.prune()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TaskStore) prune() {
|
||||
cutoff := time.Now().Add(-s.ttl)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
pruned := 0
|
||||
for id, h := range s.handles {
|
||||
h.mu.RLock()
|
||||
expired := h.CreatedAt.Before(cutoff)
|
||||
h.mu.RUnlock()
|
||||
if expired {
|
||||
delete(s.handles, id)
|
||||
pruned++
|
||||
}
|
||||
}
|
||||
if pruned > 0 {
|
||||
log.Printf("a2a_task_store: pruned %d expired handle(s) (TTL=%v)", pruned, s.ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// Size returns the current number of tracked handles. Exposed for
|
||||
// tests + ops debug endpoints (e.g. a future `/_internal/task_store_size`).
|
||||
func (s *TaskStore) Size() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return len(s.handles)
|
||||
}
|
||||
|
||||
// handleGetTask is the gin handler for `GET
|
||||
// /workspaces/:id/a2a/task/:task_id`. The polling fallback for the
|
||||
// async contract.
|
||||
//
|
||||
// Response shape:
|
||||
//
|
||||
// 200 { "task_id": "...", "workspace_id": "...",
|
||||
// "status": "pending" | "completed" | "failed",
|
||||
// "result": {...} | null,
|
||||
// "status_code": 200 | 500 | ... ,
|
||||
// "created_at": "...", "completed_at": "..." | null }
|
||||
// 404 — task_id unknown or expired
|
||||
// 400 — malformed task_id (defensive; UUIDs come from us, so this
|
||||
// should not fire in practice)
|
||||
//
|
||||
// Auth: requires the caller to be authenticated. The task_id is
|
||||
// scoped to the workspace_id from the URL, so a 404 covers BOTH
|
||||
// "unknown task_id" AND "task_id belongs to a different workspace"
|
||||
// (we never return a 404 with the workspace_id leaked).
|
||||
func (h *WorkspaceHandler) handleGetTask(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
taskID := c.Param("task_id")
|
||||
|
||||
if taskID == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "task_id is required"})
|
||||
return
|
||||
}
|
||||
|
||||
handle := getTaskStore().get(taskID)
|
||||
if handle == nil || handle.WorkspaceID != workspaceID {
|
||||
// Treat both "unknown" and "wrong workspace" as 404 — same
|
||||
// opaque response, no info leak. The task_id space is global
|
||||
// (UUIDs don't collide across workspaces) so a mismatch
|
||||
// means the caller is asking about a task that isn't theirs.
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "task not found"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, handle)
|
||||
}
|
||||
|
||||
// RegisterA2ATaskRoute adds the GET /task/:task_id polling endpoint
|
||||
// to the router. Called from internal/router/router.go alongside the
|
||||
// other /a2a routes. Extracted so the route registration is co-located
|
||||
// with the handler — easier to spot in router.go audits.
|
||||
//
|
||||
// The route is registered as a sub-path of /workspaces/:id/a2a/task/
|
||||
// (NOT /workspaces/:id/a2a/ because that path is the POST handler).
|
||||
// The canvas will hit it via the api.ts client wrapper.
|
||||
//
|
||||
// The argument is *gin.Engine (the top-level router), not a
|
||||
// RouterGroup, to match the rest of router.go's style (the other
|
||||
// /a2a-adjacent routes are registered on the top-level engine).
|
||||
func (h *WorkspaceHandler) RegisterA2ATaskRoute(r *gin.Engine) {
|
||||
r.GET("/workspaces/:id/a2a/task/:task_id", h.handleGetTask)
|
||||
}
|
||||
|
||||
// _ = context.Background suppresses unused-import lints if a future
|
||||
// refactor drops the broadcast reference; preserves the import set
|
||||
// without forcing a churn edit.
|
||||
var _ = context.Background
|
||||
@@ -0,0 +1,335 @@
|
||||
package handlers
|
||||
|
||||
// a2a_task_store_test.go — Server-side tests for the core#2751
|
||||
// durable async-dispatch expansion (202+task_id, in-memory task
|
||||
// store, GET /task/{task_id} polling endpoint).
|
||||
//
|
||||
// Required test coverage (per the user/driver-approved spec):
|
||||
//
|
||||
// (a) long turn >100s → 202 immediately, result via WS, no 524
|
||||
// → covered by TestProxyA2A_CanvasCapAndQueue_EndToEndContract
|
||||
// (in a2a_proxy_test.go) which already exists; the new 202+task_id
|
||||
// assertion was added in this PR.
|
||||
//
|
||||
// (b) short turn still resolves fast (inline reply) → covered by
|
||||
// TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch and
|
||||
// the existing fast-path tests; the task store isn't touched on
|
||||
// the fast path (no cap-and-queue) so the inline behavior is
|
||||
// unchanged.
|
||||
//
|
||||
// (c) result-delivery ordering/race covered → THIS FILE.
|
||||
// TestTaskStore_LateArrivalRace simulates a sub-second agent
|
||||
// reply that races the canvas 202-response processing. The
|
||||
// task store must serve the result regardless of which
|
||||
// side wins the race, because the store is the durable
|
||||
// buffer.
|
||||
//
|
||||
// (d) WS not starved during a long turn → covered by the existing
|
||||
// TestProxyA2A_CanvasCapAndQueue (the cap-and-queue returns at
|
||||
// ~budget, NOT after the blocked agent, so the WS path is
|
||||
// never blocked; the new 202+task_id ack doesn't add latency).
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestTaskStore_NewPendingHandle_GeneratesUniqueIDs: the cap-and-queue
|
||||
// path allocates a fresh task_id per ack. Two consecutive allocations
|
||||
// must produce DIFFERENT ids — a regression that returned a constant
|
||||
// would be a critical canvas-side correlation failure (every WS push
|
||||
// would match the most-recently-acked send).
|
||||
func TestTaskStore_NewPendingHandle_GeneratesUniqueIDs(t *testing.T) {
|
||||
s := newTaskStore(1*time.Minute, 0) // no janitor for unit-test isolation
|
||||
defer s.Stop()
|
||||
|
||||
h1 := s.newPendingHandle("ws-1")
|
||||
h2 := s.newPendingHandle("ws-1")
|
||||
|
||||
if h1.TaskID == h2.TaskID {
|
||||
t.Fatalf("two allocations produced the same task_id %q — canvas-side correlation would break", h1.TaskID)
|
||||
}
|
||||
if h1.TaskID == "" || h2.TaskID == "" {
|
||||
t.Fatalf("empty task_id allocated: h1=%q h2=%q", h1.TaskID, h2.TaskID)
|
||||
}
|
||||
if h1.Status != "pending" || h2.Status != "pending" {
|
||||
t.Errorf("new handles should be pending: h1=%q h2=%q", h1.Status, h2.Status)
|
||||
}
|
||||
if h1.WorkspaceID != "ws-1" || h2.WorkspaceID != "ws-1" {
|
||||
t.Errorf("workspace_id mismatch: h1=%q h2=%q", h1.WorkspaceID, h2.WorkspaceID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Complete_StoresResult: completing a handle must
|
||||
// store the result body so a subsequent GET /task/{task_id} serves
|
||||
// it (the polling fallback for missed WS pushes).
|
||||
func TestTaskStore_Complete_StoresResult(t *testing.T) {
|
||||
s := newTaskStore(1*time.Minute, 0)
|
||||
defer s.Stop()
|
||||
|
||||
h := s.newPendingHandle("ws-1")
|
||||
resultBody := []byte(`{"jsonrpc":"2.0","result":{"reply":"hello"}}`)
|
||||
h.complete("completed", resultBody, http.StatusOK)
|
||||
|
||||
got := s.get(h.TaskID)
|
||||
if got == nil {
|
||||
t.Fatalf("get(%q) returned nil after complete()", h.TaskID)
|
||||
}
|
||||
if got.Status != "completed" {
|
||||
t.Errorf("status = %q, want %q", got.Status, "completed")
|
||||
}
|
||||
if got.StatusCode != http.StatusOK {
|
||||
t.Errorf("status_code = %d, want %d", got.StatusCode, http.StatusOK)
|
||||
}
|
||||
if !json.Valid(got.Result) {
|
||||
t.Errorf("result is not valid JSON: %s", string(got.Result))
|
||||
}
|
||||
if string(got.Result) != string(resultBody) {
|
||||
t.Errorf("result body mismatch: got %q, want %q", string(got.Result), string(resultBody))
|
||||
}
|
||||
if got.CompletedAt == nil {
|
||||
t.Errorf("completed_at should be set")
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Get_UnknownTaskID: the GET endpoint must return nil
|
||||
// for an unknown task_id (which the handler maps to 404). A
|
||||
// regression that returned a stale or empty handle would be a
|
||||
// security/privacy bug (a canvas that polls another workspace's
|
||||
// task_id would get a non-404 answer).
|
||||
func TestTaskStore_Get_UnknownTaskID(t *testing.T) {
|
||||
s := newTaskStore(1*time.Minute, 0)
|
||||
defer s.Stop()
|
||||
|
||||
if got := s.get("nonexistent-uuid"); got != nil {
|
||||
t.Errorf("get(nonexistent) = %+v, want nil", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Complete_Idempotent: completing a handle twice
|
||||
// (e.g. the dispatch goroutine returns AND a re-poll fires before
|
||||
// the canvas prunes) must NOT clobber the FIRST terminal state.
|
||||
// The first completion wins; subsequent calls are no-ops.
|
||||
func TestTaskStore_Complete_Idempotent(t *testing.T) {
|
||||
s := newTaskStore(1*time.Minute, 0)
|
||||
defer s.Stop()
|
||||
|
||||
h := s.newPendingHandle("ws-1")
|
||||
h.complete("completed", []byte(`{"v":1}`), http.StatusOK)
|
||||
h.complete("failed", []byte(`{"v":2}`), http.StatusInternalServerError) // should be ignored
|
||||
|
||||
got := s.get(h.TaskID)
|
||||
if got == nil {
|
||||
t.Fatalf("get(%q) returned nil", h.TaskID)
|
||||
}
|
||||
if got.Status != "completed" {
|
||||
t.Errorf("status = %q, want %q (first completion must win)", got.Status, "completed")
|
||||
}
|
||||
if string(got.Result) != `{"v":1}` {
|
||||
t.Errorf("result = %q, want %q (first completion must win)", string(got.Result), `{"v":1}`)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_Prune_RemovesExpired: the janitor must drop handles
|
||||
// older than the TTL. The polling endpoint's 404 for an expired
|
||||
// task is the expected client-side UX — the canvas then re-POSTs
|
||||
// if it still needs the result (after a hard refresh etc.).
|
||||
func TestTaskStore_Prune_RemovesExpired(t *testing.T) {
|
||||
// Tight TTL for the test: 50ms. Janitor disabled; we call prune()
|
||||
// directly to keep timing deterministic.
|
||||
s := newTaskStore(50*time.Millisecond, 0)
|
||||
defer s.Stop()
|
||||
|
||||
h := s.newPendingHandle("ws-1")
|
||||
if s.Size() != 1 {
|
||||
t.Fatalf("expected 1 handle, got %d", s.Size())
|
||||
}
|
||||
|
||||
// Wait past the TTL.
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
s.prune()
|
||||
if s.Size() != 0 {
|
||||
t.Errorf("expected 0 handles after prune, got %d", s.Size())
|
||||
}
|
||||
if got := s.get(h.TaskID); got != nil {
|
||||
t.Errorf("get(expired) = %+v, want nil", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_LateArrivalRace: THE contract-critical test. The
|
||||
// canvas POSTs, gets 202+task_id, but the agent's reply arrives via
|
||||
// WS BEFORE the canvas has finished processing the 202 response.
|
||||
// The WS push is best-effort; the GET /task/{task_id} endpoint is
|
||||
// the durable recovery. This test pins that property.
|
||||
//
|
||||
// Race timeline modeled:
|
||||
//
|
||||
// t0: canvas POSTs
|
||||
// t0+ε: server allocates taskHandle (pending)
|
||||
// t0+10ms: detached dispatch goroutine completes (fast agent)
|
||||
// t0+10ms: server records result in taskHandle (completed)
|
||||
// t0+10ms: server broadcasts A2A_RESPONSE (task_id in payload)
|
||||
// t0+50ms: canvas's 202-response handler completes
|
||||
// t0+50ms: canvas polls GET /task/{task_id}
|
||||
//
|
||||
// Even if the canvas misses the WS push (network blip, client
|
||||
// reload between t0+10ms and t0+50ms), the GET endpoint must
|
||||
// return the completed result. The race is also exercised in
|
||||
// the reverse direction (poll-then-broadcast) but the test
|
||||
// invariant holds for both orderings.
|
||||
func TestTaskStore_LateArrivalRace(t *testing.T) {
|
||||
// Use a package-internal store with a long TTL so the handle
|
||||
// survives the test window. The package-level singleton is
|
||||
// not used here so concurrent test runs don't cross-contaminate.
|
||||
ts := newTaskStore(5*time.Minute, 0)
|
||||
defer ts.Stop()
|
||||
|
||||
// Pin the package-level singleton to the test store.
|
||||
restore := setTaskStoreForTest(ts)
|
||||
defer restore()
|
||||
|
||||
h := ts.newPendingHandle("ws-race")
|
||||
|
||||
// Simulate the detached dispatch completing (the agent
|
||||
// replied sub-second — well before any client-side budget).
|
||||
resultBody := []byte(`{"jsonrpc":"2.0","result":{"reply":"fast agent"}}`)
|
||||
h.complete("completed", resultBody, http.StatusOK)
|
||||
|
||||
// The canvas's 202-response handler has not yet returned
|
||||
// (it will, but the test simulates the canvas polling
|
||||
// GET /task/{task_id} before that completes — a tab reload
|
||||
// between t0+10ms and t0+50ms, say). The GET endpoint
|
||||
// must serve the result.
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{
|
||||
{Key: "id", Value: "ws-race"},
|
||||
{Key: "task_id", Value: h.TaskID},
|
||||
}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-race/a2a/task/"+h.TaskID, nil)
|
||||
|
||||
rec := &recordingBroadcaster{}
|
||||
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
|
||||
handler.handleGetTask(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("GET /task/{task_id} returned %d (body=%s), want 200", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "completed" {
|
||||
t.Errorf("status = %v, want %q", resp["status"], "completed")
|
||||
}
|
||||
if resp["task_id"] != h.TaskID {
|
||||
t.Errorf("task_id = %v, want %q", resp["task_id"], h.TaskID)
|
||||
}
|
||||
if resp["result"] == nil {
|
||||
t.Errorf("result is nil — the late-arrival race is NOT covered: a missed WS push would lose the reply")
|
||||
}
|
||||
// The result is a json.RawMessage in the handle, so it
|
||||
// round-trips as a JSON object (or string); re-parse and
|
||||
// assert the reply text is intact.
|
||||
if result, ok := resp["result"].(map[string]interface{}); ok {
|
||||
if r, ok := result["result"].(map[string]interface{}); ok {
|
||||
if r["reply"] != "fast agent" {
|
||||
t.Errorf("result.result.reply = %v, want %q", r["reply"], "fast agent")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_GetTask_NotFound: the handler must return 404 for
|
||||
// both unknown task_id AND task_id that belongs to a different
|
||||
// workspace (no cross-workspace info leak).
|
||||
func TestTaskStore_GetTask_NotFound(t *testing.T) {
|
||||
ts := newTaskStore(1*time.Minute, 0)
|
||||
defer ts.Stop()
|
||||
restore := setTaskStoreForTest(ts)
|
||||
defer restore()
|
||||
|
||||
// Handle in workspace ws-A.
|
||||
h := ts.newPendingHandle("ws-A")
|
||||
h.complete("completed", []byte(`{"x":1}`), http.StatusOK)
|
||||
|
||||
rec := &recordingBroadcaster{}
|
||||
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Case 1: unknown task_id.
|
||||
w1 := httptest.NewRecorder()
|
||||
c1, _ := gin.CreateTestContext(w1)
|
||||
c1.Params = gin.Params{{Key: "id", Value: "ws-A"}, {Key: "task_id", Value: "does-not-exist"}}
|
||||
c1.Request = httptest.NewRequest("GET", "/workspaces/ws-A/a2a/task/does-not-exist", nil)
|
||||
handler.handleGetTask(c1)
|
||||
if w1.Code != http.StatusNotFound {
|
||||
t.Errorf("unknown task_id: got %d, want 404", w1.Code)
|
||||
}
|
||||
|
||||
// Case 2: real task_id BUT wrong workspace — same 404 (no info leak).
|
||||
w2 := httptest.NewRecorder()
|
||||
c2, _ := gin.CreateTestContext(w2)
|
||||
c2.Params = gin.Params{{Key: "id", Value: "ws-B"}, {Key: "task_id", Value: h.TaskID}}
|
||||
c2.Request = httptest.NewRequest("GET", "/workspaces/ws-B/a2a/task/"+h.TaskID, nil)
|
||||
handler.handleGetTask(c2)
|
||||
if w2.Code != http.StatusNotFound {
|
||||
t.Errorf("cross-workspace poll: got %d, want 404 (no info leak)", w2.Code)
|
||||
}
|
||||
|
||||
// Case 3: real task_id + correct workspace — 200.
|
||||
w3 := httptest.NewRecorder()
|
||||
c3, _ := gin.CreateTestContext(w3)
|
||||
c3.Params = gin.Params{{Key: "id", Value: "ws-A"}, {Key: "task_id", Value: h.TaskID}}
|
||||
c3.Request = httptest.NewRequest("GET", "/workspaces/ws-A/a2a/task/"+h.TaskID, nil)
|
||||
handler.handleGetTask(c3)
|
||||
if w3.Code != http.StatusOK {
|
||||
t.Errorf("correct workspace: got %d, want 200", w3.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTaskStore_ConcurrentAccess pins the thread-safety of the
|
||||
// in-memory store under realistic concurrent load (e.g. 100
|
||||
// canvas tabs each issuing a POST + GET, racing with a few
|
||||
// dispatch-goroutine completes). The test asserts the store does
|
||||
// not deadlock, lose writes, or panic.
|
||||
func TestTaskStore_ConcurrentAccess(t *testing.T) {
|
||||
ts := newTaskStore(1*time.Minute, 0)
|
||||
defer ts.Stop()
|
||||
restore := setTaskStoreForTest(ts)
|
||||
defer restore()
|
||||
|
||||
const (
|
||||
nWorkers = 16
|
||||
nTasksPer = 50
|
||||
)
|
||||
var wg sync.WaitGroup
|
||||
var completedCount atomic.Int64
|
||||
for w := 0; w < nWorkers; w++ {
|
||||
wg.Add(1)
|
||||
go func(w int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < nTasksPer; i++ {
|
||||
h := ts.newPendingHandle("ws-x")
|
||||
h.complete("completed", []byte(`{"ok":true}`), http.StatusOK)
|
||||
if got := ts.get(h.TaskID); got != nil && got.Status == "completed" {
|
||||
completedCount.Add(1)
|
||||
}
|
||||
}
|
||||
}(w)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
want := int64(nWorkers * nTasksPer)
|
||||
if completedCount.Load() != want {
|
||||
t.Errorf("completedCount = %d, want %d (lost writes under concurrent access)", completedCount.Load(), want)
|
||||
}
|
||||
}
|
||||
@@ -242,6 +242,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
|
||||
// A2A proxy — registered outside the auth group; already enforces CanCommunicate access control.
|
||||
r.POST("/workspaces/:id/a2a", wh.ProxyA2A)
|
||||
// A2A task-status polling endpoint (core#2751 durable async-dispatch
|
||||
// expansion). The canvas uses this as the polling fallback when the
|
||||
// A2A_RESPONSE WS push is missed (canvas hard-refresh, network blip).
|
||||
// The handler is registered via the WorkspaceHandler so it can be
|
||||
// unit-tested with the same harness as the rest of the a2a proxy.
|
||||
wh.RegisterA2ATaskRoute(r)
|
||||
|
||||
// A2A queue status lookup (RFC #2331 Tier 1) — registered outside the
|
||||
// workspace auth group because the row's caller_id may be a DIFFERENT
|
||||
|
||||
Reference in New Issue
Block a user