From 6faea202b94ee121e89d5b71f948af79af92b4f5 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 23 Apr 2026 15:55:43 -0700 Subject: [PATCH] fix(a2a-queue): nil-safe drain + 202-requeue handling (followup to #1893) (#1896) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(a2a-queue): nil-safe error extraction in DrainQueueForWorkspace + handle 202-requeue The drain path called proxyErr.Response["error"].(string) without a comma- ok assertion. When proxyErr.Response had no "error" key (which happens in the 202-Accepted-queued branch I added in the same PR — that response is {"queued": true, "queue_id": ..., "queue_depth": ...}), the type assertion panicked and killed the platform process. The platform was down 25 minutes today before this was diagnosed. Fleet went from 30 real outputs/15min → 0 events. Two fixes here: 1. Treat 202 Accepted from the inner proxyA2ARequest as "re-queued" (target was busy AGAIN). Mark THIS attempt completed; the new queue row will be drained on the next heartbeat tick. Don't propagate as failure. 2. Defensive type-assertion when reading the error string. Falls back to http.StatusText, then a generic "unknown drain dispatch error" so the queue still gets a non-empty error_detail for ops debugging. Now the drain path can never panic on a malformed proxy response. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(a2a-queue): return (202, body, nil) so callers see queued-as-success Cycle 53 found callers logging 45× 'delegation failed: proxy a2a error' even though the queue's drain stats showed 48 completions in the same window. Investigation: my busy-error path returned return http.StatusAccepted, nil, &proxyA2AError{Status: 202, Response: ...} The non-nil proxyA2AError is the failure signal. Even with status=202, callers' `if proxyErr != nil` branch fires and logs the request as failed. The 202 status was meaningless — the response body was nil too, so the caller never even saw the queue_id/depth metadata. Fix: return success-shape so callers do NOT enter the error branch: respBody, _ := json.Marshal(gin.H{"queued": true, "queue_id": qid, ...}) return http.StatusAccepted, respBody, nil Net effect: queue continues to absorb busy-errors (working since #1893), AND callers correctly record the dispatch as queued-success rather than failed. Closes the cycle 53 misclassification that was making the queue look ineffective on activity_logs counts. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com> --- .../internal/handlers/a2a_proxy_helpers.go | 27 +++++++++------- .../internal/handlers/a2a_queue.go | 32 ++++++++++++++++--- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index bd406b4f..4932de31 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -58,24 +58,27 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // Issue #110. // // #1870 Phase 1: before returning 503, enqueue the request for drain - // on next heartbeat. Returning 202 Accepted {queued:true} means the - // caller records "dispatched — queued" not "failed", eliminating the - // fan-out-storm drop pattern. + // on next heartbeat. Returning 202 Accepted {queued:true} as a SUCCESS + // (not an error) means callers record this as "dispatched — queued" + // not "failed", eliminating the fan-out-storm drop pattern. + // + // Critical: must return (status, body, NIL ERROR) so the caller's + // `if proxyErr != nil` branch doesn't fire. Returning a proxyA2AError + // with 202 status here was the original cycle 53 bug — callers saw + // proxyErr != nil and logged "delegation failed: proxy a2a error". if isUpstreamBusyError(err) { idempotencyKey := extractIdempotencyKey(body) if qid, depth, qerr := EnqueueA2A( ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey, ); qerr == nil { log.Printf("ProxyA2A: target %s busy — enqueued as %s (depth=%d)", workspaceID, qid, depth) - return http.StatusAccepted, nil, &proxyA2AError{ - Status: http.StatusAccepted, - Response: gin.H{ - "queued": true, - "queue_id": qid, - "queue_depth": depth, - "message": "workspace agent busy — request queued, will dispatch when capacity available", - }, - } + respBody, _ := json.Marshal(gin.H{ + "queued": true, + "queue_id": qid, + "queue_depth": depth, + "message": "workspace agent busy — request queued, will dispatch when capacity available", + }) + return http.StatusAccepted, respBody, nil } else { // Queue insert failed — fall through to legacy 503 behavior // so callers still retry. We don't want a queue DB hiccup to diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index 177d6b82..dadc9256 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -16,6 +16,7 @@ import ( "encoding/json" "errors" "log" + "net/http" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" ) @@ -233,11 +234,34 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace } // logActivity=false: the original EnqueueA2A callsite already logged // the dispatch attempt; re-logging here would double-count events. - _, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false) + status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false) + + // 202 Accepted = the dispatch was itself queued again (target still busy). + // That's not a failure — the queued item just stays queued naturally on + // the next drain tick. Mark this attempt completed so we don't double- + // count attempts; the new (re-)queue row already exists. + if status == http.StatusAccepted { + MarkQueueItemCompleted(ctx, item.ID) + log.Printf("A2AQueue drain: %s re-queued (target still busy)", item.ID) + return + } + if proxyErr != nil { - MarkQueueItemFailed(ctx, item.ID, proxyErr.Response["error"].(string)) - log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %v", - item.ID, item.Attempts, proxyErr.Response["error"]) + // Defensive: proxyErr.Response is gin.H (map[string]interface{}). The + // "error" key is conventionally a string but can be missing or non- + // string in edge paths (e.g. a future error builder using a typed + // struct). Cast safely so a missing key doesn't crash the platform — + // today's outage was caused by an unchecked .(string) here. + errMsg, _ := proxyErr.Response["error"].(string) + if errMsg == "" { + errMsg = http.StatusText(proxyErr.Status) + if errMsg == "" { + errMsg = "unknown drain dispatch error" + } + } + MarkQueueItemFailed(ctx, item.ID, errMsg) + log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %s", + item.ID, item.Attempts, errMsg) return } MarkQueueItemCompleted(ctx, item.ID)