From d97d7d4768460b8b22978e47d61e2820f0b425ff Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 10:14:19 -0700 Subject: [PATCH] fix(platform/delegation): classify queued response + stitch drain result back MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When proxyA2A returns 202+{queued:true} (target busy → enqueued for drain on next heartbeat), executeDelegation previously treated it as a successful completion and ran extractResponseText on the queued JSON. The result was 'Delegation completed (workspace agent busy — request queued, will dispatch...)' landing in activity_logs.summary, which the LLM then echoed to the user chat as garbage. Two fixes: 1. delegation.go: detect queued shape via new isQueuedProxyResponse helper, write status='queued' with clean summary 'Delegation queued — target at capacity', store delegation_id in response_body so the drain can stitch back later. Also embed delegation_id in params.message.metadata + use it as messageId so the proxy's idempotency-key path keys off the same id. 2. a2a_queue.go: when DrainQueueForWorkspace successfully drains a queued item, extract delegation_id from the body's metadata and UPDATE the originating delegate_result row (queued → completed with real response_body). Broadcast DELEGATION_COMPLETE so the canvas chat feed flips the queued line to completed in real time. Closes the loop so check_task_status reflects ground truth instead of perpetual 'queued' even after the queued request eventually drained. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- .../internal/handlers/a2a_queue.go | 87 ++++++++++++++++++- .../internal/handlers/a2a_queue_test.go | 33 +++++++ .../internal/handlers/delegation.go | 63 +++++++++++++- .../internal/handlers/delegation_test.go | 33 +++++++ 4 files changed, 212 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index b747fac4..fb0e3b80 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -288,7 +288,7 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace } // logActivity=false: the original EnqueueA2A callsite already logged // the dispatch attempt; re-logging here would double-count events. - status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false) + status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false) // 202 Accepted = the dispatch was itself queued again (target still busy). // That's not a failure — the queued item just stays queued naturally on @@ -321,4 +321,89 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace MarkQueueItemCompleted(ctx, item.ID) log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)", item.ID, workspaceID, item.Attempts) + + // Stitch the response back to the originating delegation row, if this + // queue item was a delegation. Without this, check_task_status would + // see status='queued' (set by the executeDelegation queued-branch) and + // the LLM would think the work was never done. We embed delegation_id + // in params.message.metadata at Delegate-handler time; pull it out + // here and UPDATE the delegate_result row so the original caller can + // observe the real reply. + if delegationID := extractDelegationIDFromBody(item.Body); delegationID != "" { + h.stitchDrainResponseToDelegation(ctx, callerID, item.WorkspaceID, delegationID, respBody) + } +} + +// extractDelegationIDFromBody pulls params.message.metadata.delegation_id +// out of an A2A JSON-RPC body. Empty string when absent — drain treats +// that as "this queue item didn't originate from /workspaces/:id/delegate" +// and skips the stitch, so non-delegation queue uses (cross-workspace +// peer-direct A2A) aren't affected. +func extractDelegationIDFromBody(body []byte) string { + var envelope struct { + Params struct { + Message struct { + Metadata struct { + DelegationID string `json:"delegation_id"` + } `json:"metadata"` + } `json:"message"` + } `json:"params"` + } + if err := json.Unmarshal(body, &envelope); err != nil { + return "" + } + return envelope.Params.Message.Metadata.DelegationID +} + +// stitchDrainResponseToDelegation writes the drained response into the +// delegation's existing delegate_result row (created with status='queued' +// by executeDelegation when the proxy first returned queued). This is the +// other half of the loop that closes "queued → completed" so the LLM's +// check_task_status reflects ground truth. +// +// Errors are logged-only — drain is fire-and-forget from Heartbeat, and a +// stitch failure shouldn't block other queued items. The delegation will +// just remain stuck at 'queued' in this case, which is the pre-fix +// behaviour (no regression vs. shipping nothing). +func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context, sourceID, targetID, delegationID string, respBody []byte) { + if sourceID == "" || delegationID == "" { + return + } + responseText := extractResponseText(respBody) + respJSON, _ := json.Marshal(map[string]interface{}{ + "text": responseText, + "delegation_id": delegationID, + }) + res, err := db.DB.ExecContext(ctx, ` + UPDATE activity_logs + SET status = 'completed', + summary = $1, + response_body = $2::jsonb + WHERE workspace_id = $3 + AND method = 'delegate_result' + AND target_id = $4 + AND response_body->>'delegation_id' = $5 + `, "Delegation completed ("+truncate(responseText, 80)+")", string(respJSON), + sourceID, targetID, delegationID) + if err != nil { + log.Printf("A2AQueue drain stitch: update failed for delegation %s: %v", delegationID, err) + return + } + if rows, _ := res.RowsAffected(); rows == 0 { + log.Printf("A2AQueue drain stitch: no delegate_result row for delegation %s (queued-row may not exist yet)", delegationID) + return + } + log.Printf("A2AQueue drain stitch: delegation %s queued → completed (%d chars)", delegationID, len(responseText)) + + // Broadcast DELEGATION_COMPLETE so the canvas chat feed flips the + // "⏸ queued" line to "✓ completed" in real time. Without this the + // transition only surfaces after the user reloads or polls activity. + if h.broadcaster != nil { + h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{ + "delegation_id": delegationID, + "target_id": targetID, + "response_preview": truncate(responseText, 200), + "via": "queue_drain", + }) + } } diff --git a/workspace-server/internal/handlers/a2a_queue_test.go b/workspace-server/internal/handlers/a2a_queue_test.go index 940f7f2f..57000910 100644 --- a/workspace-server/internal/handlers/a2a_queue_test.go +++ b/workspace-server/internal/handlers/a2a_queue_test.go @@ -80,6 +80,39 @@ func TestExtractIdempotencyKey_emptyOnMissing(t *testing.T) { } } +func TestExtractDelegationIDFromBody(t *testing.T) { + cases := []struct { + name string + body string + want string + }{ + { + name: "delegation body — metadata.delegation_id present", + body: `{"method":"message/send","params":{"message":{"role":"user","messageId":"abc-123","metadata":{"delegation_id":"abc-123"},"parts":[{"type":"text","text":"hi"}]}}}`, + want: "abc-123", + }, + { + name: "non-delegation body — no metadata (peer-direct A2A)", + body: `{"method":"message/send","params":{"message":{"role":"user","messageId":"m-1","parts":[{"type":"text","text":"hi"}]}}}`, + want: "", + }, + { + name: "metadata present but no delegation_id key", + body: `{"params":{"message":{"metadata":{"trace_id":"t-1"}}}}`, + want: "", + }, + {"malformed JSON", `not json`, ""}, + {"empty body", ``, ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := extractDelegationIDFromBody([]byte(tc.body)); got != tc.want { + t.Errorf("extractDelegationIDFromBody = %q, want %q", got, tc.want) + } + }) + } +} + // ────────────────────────────────────────────────────────────────────────────── // DrainQueueForWorkspace — nil-safe error extraction regression tests // diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index 8c0d681f..59da198c 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -78,13 +78,21 @@ func (h *DelegationHandler) Delegate(c *gin.Context) { // reason (logged); we still dispatch the A2A request and surface the // warning in the response. - // Build A2A payload + // Build A2A payload. Embedding delegation_id in metadata gives the + // queue drain path a way to look up the originating delegation row + // when stitching the response back (issue: previously the drain + // dispatched successfully but discarded the response, so + // check_task_status returned status='queued' forever even after a + // real reply landed). messageId mirrors delegation_id so the + // platform's idempotency-key extraction also keys off the same id. a2aBody, _ := json.Marshal(map[string]interface{}{ "method": "message/send", "params": map[string]interface{}{ "message": map[string]interface{}{ - "role": "user", - "parts": []map[string]interface{}{{"type": "text", "text": body.Task}}, + "role": "user", + "messageId": delegationID, + "parts": []map[string]interface{}{{"type": "text", "text": body.Task}}, + "metadata": map[string]interface{}{"delegation_id": delegationID}, }, }, }) @@ -284,6 +292,40 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s return } + // 202 + {queued: true} means the target was busy and the proxy + // enqueued the request for the next drain tick — NOT a completion. + // Treat it as such: write a clean 'queued' activity row with no + // JSON-as-text leakage into the summary, broadcast a status update, + // and return. The eventual drain doesn't (yet) feed a result back + // into this delegation, so callers polling check_task_status will + // see status='queued' and know to retry instead of believing the + // queued JSON is the agent's reply. Fixes the chat-leak where the + // LLM echoed "Delegation completed (workspace agent busy ...)" to + // the user. + if status == http.StatusAccepted && isQueuedProxyResponse(respBody) { + log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID) + h.updateDelegationStatus(sourceID, delegationID, "queued", "") + // Store delegation_id in response_body so DrainQueueForWorkspace's + // stitch step can find this row by JSON-path key after the queued + // dispatch eventually succeeds. Without the key, the drain finds + // the row by (workspace_id, target_id, method) but can't tell + // multiple-queued-delegations-to-same-target apart. + queuedJSON, _ := json.Marshal(map[string]interface{}{ + "delegation_id": delegationID, + "queued": true, + }) + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status) + VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued') + `, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil { + log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err) + } + h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_STATUS", sourceID, map[string]interface{}{ + "delegation_id": delegationID, "target_id": targetID, "status": "queued", + }) + return + } + // A2A returned 200 — target received and processed the task // Status: dispatched → received → completed (we don't have a separate "received" signal from the target yet) responseText := extractResponseText(respBody) @@ -517,6 +559,21 @@ func isTransientProxyError(err *proxyA2AError) bool { return false } +// isQueuedProxyResponse reports whether the proxy returned a body shaped like +// `{"queued": true, "queue_id": ..., "queue_depth": ..., "message": ...}` — +// the busy-target enqueue path in a2a_proxy_helpers.go. Caller checks this +// alongside HTTP 202 to distinguish a successful agent reply from a deferred +// dispatch; without the distinction we'd write the queued-message JSON into +// the delegation result row and the LLM would surface it as agent output. +func isQueuedProxyResponse(body []byte) bool { + var resp map[string]interface{} + if json.Unmarshal(body, &resp) != nil { + return false + } + queued, _ := resp["queued"].(bool) + return queued +} + func extractResponseText(body []byte) string { var resp map[string]interface{} if json.Unmarshal(body, &resp) != nil { diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index caa5118d..21cc3a90 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -376,6 +376,39 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) { } } +func TestIsQueuedProxyResponse(t *testing.T) { + // Regression guard for the chat-leak bug: when the proxy returns + // 202 with a queued-shape body, executeDelegation must classify it + // as "queued" — not "completed". Mis-classifying it causes the + // queued JSON to land in activity_logs.summary, which the LLM then + // echoes verbatim into the agent chat as + // "Delegation completed: Delegation completed (workspace agent + // busy — request queued, will dispatch...)". + cases := []struct { + name string + body string + want bool + }{ + { + name: "real proxy busy-enqueue body", + body: `{"queued":true,"queue_id":"d0993390-5f5a-4f5d-90a2-66639e53e3c9","queue_depth":1,"message":"workspace agent busy — request queued, will dispatch when capacity available"}`, + want: true, + }, + {"queued false explicitly", `{"queued":false}`, false}, + {"queued field absent (real A2A reply)", `{"jsonrpc":"2.0","id":"1","result":{"kind":"message","parts":[{"kind":"text","text":"hi"}]}}`, false}, + {"non-bool queued value (defensive)", `{"queued":"true"}`, false}, + {"malformed JSON", `not-json`, false}, + {"empty body", ``, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isQueuedProxyResponse([]byte(tc.body)); got != tc.want { + t.Errorf("isQueuedProxyResponse(%q) = %v, want %v", tc.body, got, tc.want) + } + }) + } +} + func TestDelegationRetryDelay_IsSaneWindow(t *testing.T) { // Regression guard: the retry delay must be long enough for the // reactive URL refresh in proxyA2ARequest to kick in (which involves