diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index bd406b4f..4932de31 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -58,24 +58,27 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // Issue #110. // // #1870 Phase 1: before returning 503, enqueue the request for drain - // on next heartbeat. Returning 202 Accepted {queued:true} means the - // caller records "dispatched — queued" not "failed", eliminating the - // fan-out-storm drop pattern. + // on next heartbeat. Returning 202 Accepted {queued:true} as a SUCCESS + // (not an error) means callers record this as "dispatched — queued" + // not "failed", eliminating the fan-out-storm drop pattern. + // + // Critical: must return (status, body, NIL ERROR) so the caller's + // `if proxyErr != nil` branch doesn't fire. Returning a proxyA2AError + // with 202 status here was the original cycle 53 bug — callers saw + // proxyErr != nil and logged "delegation failed: proxy a2a error". if isUpstreamBusyError(err) { idempotencyKey := extractIdempotencyKey(body) if qid, depth, qerr := EnqueueA2A( ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey, ); qerr == nil { log.Printf("ProxyA2A: target %s busy — enqueued as %s (depth=%d)", workspaceID, qid, depth) - return http.StatusAccepted, nil, &proxyA2AError{ - Status: http.StatusAccepted, - Response: gin.H{ - "queued": true, - "queue_id": qid, - "queue_depth": depth, - "message": "workspace agent busy — request queued, will dispatch when capacity available", - }, - } + respBody, _ := json.Marshal(gin.H{ + "queued": true, + "queue_id": qid, + "queue_depth": depth, + "message": "workspace agent busy — request queued, will dispatch when capacity available", + }) + return http.StatusAccepted, respBody, nil } else { // Queue insert failed — fall through to legacy 503 behavior // so callers still retry. We don't want a queue DB hiccup to diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index 177d6b82..dadc9256 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -16,6 +16,7 @@ import ( "encoding/json" "errors" "log" + "net/http" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" ) @@ -233,11 +234,34 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace } // logActivity=false: the original EnqueueA2A callsite already logged // the dispatch attempt; re-logging here would double-count events. - _, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false) + status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false) + + // 202 Accepted = the dispatch was itself queued again (target still busy). + // That's not a failure — the queued item just stays queued naturally on + // the next drain tick. Mark this attempt completed so we don't double- + // count attempts; the new (re-)queue row already exists. + if status == http.StatusAccepted { + MarkQueueItemCompleted(ctx, item.ID) + log.Printf("A2AQueue drain: %s re-queued (target still busy)", item.ID) + return + } + if proxyErr != nil { - MarkQueueItemFailed(ctx, item.ID, proxyErr.Response["error"].(string)) - log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %v", - item.ID, item.Attempts, proxyErr.Response["error"]) + // Defensive: proxyErr.Response is gin.H (map[string]interface{}). The + // "error" key is conventionally a string but can be missing or non- + // string in edge paths (e.g. a future error builder using a typed + // struct). Cast safely so a missing key doesn't crash the platform — + // today's outage was caused by an unchecked .(string) here. + errMsg, _ := proxyErr.Response["error"].(string) + if errMsg == "" { + errMsg = http.StatusText(proxyErr.Status) + if errMsg == "" { + errMsg = "unknown drain dispatch error" + } + } + MarkQueueItemFailed(ctx, item.ID, errMsg) + log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %s", + item.ID, item.Attempts, errMsg) return } MarkQueueItemCompleted(ctx, item.ID)