fix(platform/delegation): classify queued response + stitch drain result back
When proxyA2A returns 202+{queued:true} (target busy → enqueued for drain
on next heartbeat), executeDelegation previously treated it as a successful
completion and ran extractResponseText on the queued JSON. The result was
'Delegation completed (workspace agent busy — request queued, will dispatch...)'
landing in activity_logs.summary, which the LLM then echoed to the user
chat as garbage.
Two fixes:
1. delegation.go: detect queued shape via new isQueuedProxyResponse helper,
write status='queued' with clean summary 'Delegation queued — target at
capacity', store delegation_id in response_body so the drain can stitch
back later. Also embed delegation_id in params.message.metadata + use it
as messageId so the proxy's idempotency-key path keys off the same id.
2. a2a_queue.go: when DrainQueueForWorkspace successfully drains a queued
item, extract delegation_id from the body's metadata and UPDATE the
originating delegate_result row (queued → completed with real
response_body). Broadcast DELEGATION_COMPLETE so the canvas chat feed
flips the queued line to completed in real time.
Closes the loop so check_task_status reflects ground truth instead of
perpetual 'queued' even after the queued request eventually drained.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
This commit is contained in:
parent
d949b5b323
commit
d97d7d4768
@ -288,7 +288,7 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
}
|
||||
// logActivity=false: the original EnqueueA2A callsite already logged
|
||||
// the dispatch attempt; re-logging here would double-count events.
|
||||
status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
|
||||
|
||||
// 202 Accepted = the dispatch was itself queued again (target still busy).
|
||||
// That's not a failure — the queued item just stays queued naturally on
|
||||
@ -321,4 +321,89 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
MarkQueueItemCompleted(ctx, item.ID)
|
||||
log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)",
|
||||
item.ID, workspaceID, item.Attempts)
|
||||
|
||||
// Stitch the response back to the originating delegation row, if this
|
||||
// queue item was a delegation. Without this, check_task_status would
|
||||
// see status='queued' (set by the executeDelegation queued-branch) and
|
||||
// the LLM would think the work was never done. We embed delegation_id
|
||||
// in params.message.metadata at Delegate-handler time; pull it out
|
||||
// here and UPDATE the delegate_result row so the original caller can
|
||||
// observe the real reply.
|
||||
if delegationID := extractDelegationIDFromBody(item.Body); delegationID != "" {
|
||||
h.stitchDrainResponseToDelegation(ctx, callerID, item.WorkspaceID, delegationID, respBody)
|
||||
}
|
||||
}
|
||||
|
||||
// extractDelegationIDFromBody pulls params.message.metadata.delegation_id
|
||||
// out of an A2A JSON-RPC body. Empty string when absent — drain treats
|
||||
// that as "this queue item didn't originate from /workspaces/:id/delegate"
|
||||
// and skips the stitch, so non-delegation queue uses (cross-workspace
|
||||
// peer-direct A2A) aren't affected.
|
||||
func extractDelegationIDFromBody(body []byte) string {
|
||||
var envelope struct {
|
||||
Params struct {
|
||||
Message struct {
|
||||
Metadata struct {
|
||||
DelegationID string `json:"delegation_id"`
|
||||
} `json:"metadata"`
|
||||
} `json:"message"`
|
||||
} `json:"params"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return ""
|
||||
}
|
||||
return envelope.Params.Message.Metadata.DelegationID
|
||||
}
|
||||
|
||||
// stitchDrainResponseToDelegation writes the drained response into the
|
||||
// delegation's existing delegate_result row (created with status='queued'
|
||||
// by executeDelegation when the proxy first returned queued). This is the
|
||||
// other half of the loop that closes "queued → completed" so the LLM's
|
||||
// check_task_status reflects ground truth.
|
||||
//
|
||||
// Errors are logged-only — drain is fire-and-forget from Heartbeat, and a
|
||||
// stitch failure shouldn't block other queued items. The delegation will
|
||||
// just remain stuck at 'queued' in this case, which is the pre-fix
|
||||
// behaviour (no regression vs. shipping nothing).
|
||||
func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context, sourceID, targetID, delegationID string, respBody []byte) {
|
||||
if sourceID == "" || delegationID == "" {
|
||||
return
|
||||
}
|
||||
responseText := extractResponseText(respBody)
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"text": responseText,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
res, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE activity_logs
|
||||
SET status = 'completed',
|
||||
summary = $1,
|
||||
response_body = $2::jsonb
|
||||
WHERE workspace_id = $3
|
||||
AND method = 'delegate_result'
|
||||
AND target_id = $4
|
||||
AND response_body->>'delegation_id' = $5
|
||||
`, "Delegation completed ("+truncate(responseText, 80)+")", string(respJSON),
|
||||
sourceID, targetID, delegationID)
|
||||
if err != nil {
|
||||
log.Printf("A2AQueue drain stitch: update failed for delegation %s: %v", delegationID, err)
|
||||
return
|
||||
}
|
||||
if rows, _ := res.RowsAffected(); rows == 0 {
|
||||
log.Printf("A2AQueue drain stitch: no delegate_result row for delegation %s (queued-row may not exist yet)", delegationID)
|
||||
return
|
||||
}
|
||||
log.Printf("A2AQueue drain stitch: delegation %s queued → completed (%d chars)", delegationID, len(responseText))
|
||||
|
||||
// Broadcast DELEGATION_COMPLETE so the canvas chat feed flips the
|
||||
// "⏸ queued" line to "✓ completed" in real time. Without this the
|
||||
// transition only surfaces after the user reloads or polls activity.
|
||||
if h.broadcaster != nil {
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
"target_id": targetID,
|
||||
"response_preview": truncate(responseText, 200),
|
||||
"via": "queue_drain",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,6 +80,39 @@ func TestExtractIdempotencyKey_emptyOnMissing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractDelegationIDFromBody(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
body string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "delegation body — metadata.delegation_id present",
|
||||
body: `{"method":"message/send","params":{"message":{"role":"user","messageId":"abc-123","metadata":{"delegation_id":"abc-123"},"parts":[{"type":"text","text":"hi"}]}}}`,
|
||||
want: "abc-123",
|
||||
},
|
||||
{
|
||||
name: "non-delegation body — no metadata (peer-direct A2A)",
|
||||
body: `{"method":"message/send","params":{"message":{"role":"user","messageId":"m-1","parts":[{"type":"text","text":"hi"}]}}}`,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "metadata present but no delegation_id key",
|
||||
body: `{"params":{"message":{"metadata":{"trace_id":"t-1"}}}}`,
|
||||
want: "",
|
||||
},
|
||||
{"malformed JSON", `not json`, ""},
|
||||
{"empty body", ``, ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := extractDelegationIDFromBody([]byte(tc.body)); got != tc.want {
|
||||
t.Errorf("extractDelegationIDFromBody = %q, want %q", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// DrainQueueForWorkspace — nil-safe error extraction regression tests
|
||||
//
|
||||
|
||||
@ -78,13 +78,21 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
// reason (logged); we still dispatch the A2A request and surface the
|
||||
// warning in the response.
|
||||
|
||||
// Build A2A payload
|
||||
// Build A2A payload. Embedding delegation_id in metadata gives the
|
||||
// queue drain path a way to look up the originating delegation row
|
||||
// when stitching the response back (issue: previously the drain
|
||||
// dispatched successfully but discarded the response, so
|
||||
// check_task_status returned status='queued' forever even after a
|
||||
// real reply landed). messageId mirrors delegation_id so the
|
||||
// platform's idempotency-key extraction also keys off the same id.
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": body.Task}},
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": body.Task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
@ -284,6 +292,40 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
|
||||
return
|
||||
}
|
||||
|
||||
// 202 + {queued: true} means the target was busy and the proxy
|
||||
// enqueued the request for the next drain tick — NOT a completion.
|
||||
// Treat it as such: write a clean 'queued' activity row with no
|
||||
// JSON-as-text leakage into the summary, broadcast a status update,
|
||||
// and return. The eventual drain doesn't (yet) feed a result back
|
||||
// into this delegation, so callers polling check_task_status will
|
||||
// see status='queued' and know to retry instead of believing the
|
||||
// queued JSON is the agent's reply. Fixes the chat-leak where the
|
||||
// LLM echoed "Delegation completed (workspace agent busy ...)" to
|
||||
// the user.
|
||||
if status == http.StatusAccepted && isQueuedProxyResponse(respBody) {
|
||||
log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID)
|
||||
h.updateDelegationStatus(sourceID, delegationID, "queued", "")
|
||||
// Store delegation_id in response_body so DrainQueueForWorkspace's
|
||||
// stitch step can find this row by JSON-path key after the queued
|
||||
// dispatch eventually succeeds. Without the key, the drain finds
|
||||
// the row by (workspace_id, target_id, method) but can't tell
|
||||
// multiple-queued-delegations-to-same-target apart.
|
||||
queuedJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
"queued": true,
|
||||
})
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
|
||||
`, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil {
|
||||
log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_STATUS", sourceID, map[string]interface{}{
|
||||
"delegation_id": delegationID, "target_id": targetID, "status": "queued",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// A2A returned 200 — target received and processed the task
|
||||
// Status: dispatched → received → completed (we don't have a separate "received" signal from the target yet)
|
||||
responseText := extractResponseText(respBody)
|
||||
@ -517,6 +559,21 @@ func isTransientProxyError(err *proxyA2AError) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// isQueuedProxyResponse reports whether the proxy returned a body shaped like
|
||||
// `{"queued": true, "queue_id": ..., "queue_depth": ..., "message": ...}` —
|
||||
// the busy-target enqueue path in a2a_proxy_helpers.go. Caller checks this
|
||||
// alongside HTTP 202 to distinguish a successful agent reply from a deferred
|
||||
// dispatch; without the distinction we'd write the queued-message JSON into
|
||||
// the delegation result row and the LLM would surface it as agent output.
|
||||
func isQueuedProxyResponse(body []byte) bool {
|
||||
var resp map[string]interface{}
|
||||
if json.Unmarshal(body, &resp) != nil {
|
||||
return false
|
||||
}
|
||||
queued, _ := resp["queued"].(bool)
|
||||
return queued
|
||||
}
|
||||
|
||||
func extractResponseText(body []byte) string {
|
||||
var resp map[string]interface{}
|
||||
if json.Unmarshal(body, &resp) != nil {
|
||||
|
||||
@ -376,6 +376,39 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsQueuedProxyResponse(t *testing.T) {
|
||||
// Regression guard for the chat-leak bug: when the proxy returns
|
||||
// 202 with a queued-shape body, executeDelegation must classify it
|
||||
// as "queued" — not "completed". Mis-classifying it causes the
|
||||
// queued JSON to land in activity_logs.summary, which the LLM then
|
||||
// echoes verbatim into the agent chat as
|
||||
// "Delegation completed: Delegation completed (workspace agent
|
||||
// busy — request queued, will dispatch...)".
|
||||
cases := []struct {
|
||||
name string
|
||||
body string
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "real proxy busy-enqueue body",
|
||||
body: `{"queued":true,"queue_id":"d0993390-5f5a-4f5d-90a2-66639e53e3c9","queue_depth":1,"message":"workspace agent busy — request queued, will dispatch when capacity available"}`,
|
||||
want: true,
|
||||
},
|
||||
{"queued false explicitly", `{"queued":false}`, false},
|
||||
{"queued field absent (real A2A reply)", `{"jsonrpc":"2.0","id":"1","result":{"kind":"message","parts":[{"kind":"text","text":"hi"}]}}`, false},
|
||||
{"non-bool queued value (defensive)", `{"queued":"true"}`, false},
|
||||
{"malformed JSON", `not-json`, false},
|
||||
{"empty body", ``, false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := isQueuedProxyResponse([]byte(tc.body)); got != tc.want {
|
||||
t.Errorf("isQueuedProxyResponse(%q) = %v, want %v", tc.body, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationRetryDelay_IsSaneWindow(t *testing.T) {
|
||||
// Regression guard: the retry delay must be long enough for the
|
||||
// reactive URL refresh in proxyA2ARequest to kick in (which involves
|
||||
|
||||
Loading…
Reference in New Issue
Block a user