fix(a2a-queue): nil-safe drain + 202-requeue handling (followup to #1893) (#1896)

* fix(a2a-queue): nil-safe error extraction in DrainQueueForWorkspace + handle 202-requeue

The drain path called proxyErr.Response["error"].(string) without a comma-
ok assertion. When proxyErr.Response had no "error" key (which happens in
the 202-Accepted-queued branch I added in the same PR — that response is
{"queued": true, "queue_id": ..., "queue_depth": ...}), the type assertion
panicked and killed the platform process.

The platform was down 25 minutes today before this was diagnosed. Fleet
went from 30 real outputs/15min → 0 events.

Two fixes here:

1. Treat 202 Accepted from the inner proxyA2ARequest as "re-queued"
   (target was busy AGAIN). Mark THIS attempt completed; the new queue
   row will be drained on the next heartbeat tick. Don't propagate as
   failure.

2. Defensive type-assertion when reading the error string. Falls back to
   http.StatusText, then a generic "unknown drain dispatch error" so the
   queue still gets a non-empty error_detail for ops debugging.

Now the drain path can never panic on a malformed proxy response.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(a2a-queue): return (202, body, nil) so callers see queued-as-success

Cycle 53 found callers logging 45× 'delegation failed: proxy a2a error'
even though the queue's drain stats showed 48 completions in the same
window. Investigation: my busy-error path returned

  return http.StatusAccepted, nil, &proxyA2AError{Status: 202, Response: ...}

The non-nil proxyA2AError is the failure signal. Even with status=202,
callers' `if proxyErr != nil` branch fires and logs the request as
failed. The 202 status was meaningless — the response body was nil too,
so the caller never even saw the queue_id/depth metadata.

Fix: return success-shape so callers do NOT enter the error branch:

  respBody, _ := json.Marshal(gin.H{"queued": true, "queue_id": qid, ...})
  return http.StatusAccepted, respBody, nil

Net effect: queue continues to absorb busy-errors (working since #1893),
AND callers correctly record the dispatch as queued-success rather than
failed. Closes the cycle 53 misclassification that was making the queue
look ineffective on activity_logs counts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>
This commit is contained in:
Hongming Wang 2026-04-23 15:55:43 -07:00 committed by GitHub
parent 30ed7ba0b9
commit 6faea202b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 43 additions and 16 deletions

View File

@ -58,24 +58,27 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
// Issue #110.
//
// #1870 Phase 1: before returning 503, enqueue the request for drain
// on next heartbeat. Returning 202 Accepted {queued:true} means the
// caller records "dispatched — queued" not "failed", eliminating the
// fan-out-storm drop pattern.
// on next heartbeat. Returning 202 Accepted {queued:true} as a SUCCESS
// (not an error) means callers record this as "dispatched — queued"
// not "failed", eliminating the fan-out-storm drop pattern.
//
// Critical: must return (status, body, NIL ERROR) so the caller's
// `if proxyErr != nil` branch doesn't fire. Returning a proxyA2AError
// with 202 status here was the original cycle 53 bug — callers saw
// proxyErr != nil and logged "delegation failed: proxy a2a error".
if isUpstreamBusyError(err) {
idempotencyKey := extractIdempotencyKey(body)
if qid, depth, qerr := EnqueueA2A(
ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey,
); qerr == nil {
log.Printf("ProxyA2A: target %s busy — enqueued as %s (depth=%d)", workspaceID, qid, depth)
return http.StatusAccepted, nil, &proxyA2AError{
Status: http.StatusAccepted,
Response: gin.H{
"queued": true,
"queue_id": qid,
"queue_depth": depth,
"message": "workspace agent busy — request queued, will dispatch when capacity available",
},
}
respBody, _ := json.Marshal(gin.H{
"queued": true,
"queue_id": qid,
"queue_depth": depth,
"message": "workspace agent busy — request queued, will dispatch when capacity available",
})
return http.StatusAccepted, respBody, nil
} else {
// Queue insert failed — fall through to legacy 503 behavior
// so callers still retry. We don't want a queue DB hiccup to

View File

@ -16,6 +16,7 @@ import (
"encoding/json"
"errors"
"log"
"net/http"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
@ -233,11 +234,34 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
}
// logActivity=false: the original EnqueueA2A callsite already logged
// the dispatch attempt; re-logging here would double-count events.
_, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
// 202 Accepted = the dispatch was itself queued again (target still busy).
// That's not a failure — the queued item just stays queued naturally on
// the next drain tick. Mark this attempt completed so we don't double-
// count attempts; the new (re-)queue row already exists.
if status == http.StatusAccepted {
MarkQueueItemCompleted(ctx, item.ID)
log.Printf("A2AQueue drain: %s re-queued (target still busy)", item.ID)
return
}
if proxyErr != nil {
MarkQueueItemFailed(ctx, item.ID, proxyErr.Response["error"].(string))
log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %v",
item.ID, item.Attempts, proxyErr.Response["error"])
// Defensive: proxyErr.Response is gin.H (map[string]interface{}). The
// "error" key is conventionally a string but can be missing or non-
// string in edge paths (e.g. a future error builder using a typed
// struct). Cast safely so a missing key doesn't crash the platform —
// today's outage was caused by an unchecked .(string) here.
errMsg, _ := proxyErr.Response["error"].(string)
if errMsg == "" {
errMsg = http.StatusText(proxyErr.Status)
if errMsg == "" {
errMsg = "unknown drain dispatch error"
}
}
MarkQueueItemFailed(ctx, item.ID, errMsg)
log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %s",
item.ID, item.Attempts, errMsg)
return
}
MarkQueueItemCompleted(ctx, item.ID)