From 8603f17c302a813cfd1ee3b908494d50b540f9c8 Mon Sep 17 00:00:00 2001 From: hongming Date: Fri, 22 May 2026 12:15:14 -0700 Subject: [PATCH] =?UTF-8?q?fix(scheduler):=20#1684=20=E2=80=94=20native=5F?= =?UTF-8?q?session=20adapters=20now=20use=20platform=20a2a=5Fqueue=20(unbl?= =?UTF-8?q?ock=20Reno=20Stars=20cron=20starvation)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix, `handleA2ADispatchError` short-circuited to 503-no-queue when the target adapter declared `provides_native_session=True`. The rationale (retained in the original code-comment) was that the SDK owned an inbound queue and platform-side enqueueing would double-buffer with no clean drain-readiness signal. In production this assumption fails for the common native_session SDKs (claude-agent-sdk, codex app-server, hermes-agent): they have no inbound queue — new turns can only arrive via the same HTTP POST that just returned busy. So cron fires (and any A2A retry) bounce 503 every tick until the SDK voluntarily yields. Reno Stars #1684 observed 12 consecutive `*/30` fires lost over 6h while a single native_session held the slot. The "no clean drain-readiness signal" concern turns out to be unfounded: `registry.go:Heartbeat` already gates drain by `payload.ActiveTasks < maxConcurrent`, so `DrainQueueForWorkspace` only fires when the SDK itself reports spare capacity. That IS the post-session-end signal: the native_session SDK reports ActiveTasks=1 while in a turn, 0 when idle, and the next heartbeat after idle triggers drain. The platform queue's drain timing IS tied to SDK readiness — the original comment was wrong. This change collapses the two branches into one: both native_session and non-native callers now enqueue here. The native_session SDK's own in-flight POST stays unaffected; the queued item drains on the next post-idle heartbeat. The `native_session=true` marker is dropped from the 503 response body since callers no longer need to distinguish (the platform queues both kinds). - a2a_proxy_helpers.go: remove the `if HasCapability(workspaceID, "session")` early-return; rewrite the comment to record the rationale for future readers - native_session_test.go: invert the existing positive pin (TestHandleA2ADispatchError_NativeSession_SkipsEnqueue → _NowEnqueues), keep the negative pin (non-native still enqueues) Refs: #1684, Reno Stars production-client report Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/a2a_proxy_helpers.go | 51 ++++++------- .../internal/handlers/native_session_test.go | 74 +++++++++++-------- 2 files changed, 65 insertions(+), 60 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 5f944fc79..cd16f989d 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -71,35 +71,30 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // with 202 status here was the original cycle 53 bug — callers saw // proxyErr != nil and logged "delegation failed: proxy a2a error". if isUpstreamBusyError(err) { - // Capability primitive #5 — see project memory - // `project_runtime_native_pluggable.md`. When the target workspace's - // adapter has declared provides_native_session=True, the SDK - // owns its own queue/session state (claude-agent-sdk's streaming - // session, hermes-agent's in-container event log, etc.). Adding - // the platform's a2a_queue layer on top would double-buffer the - // same in-flight state — and worse, the platform queue's drain - // timing has no relationship to the SDK's actual readiness, so - // the queued request might dispatch while the SDK is STILL busy. + // #1684 / Reno Stars: native_session adapters previously took a + // 503-no-enqueue path here, on the assumption that the SDK owned + // an inbound queue and the platform a2a_queue would double-buffer. + // In practice, the common native_session SDKs (claude-agent-sdk, + // codex app-server, hermes-agent) do NOT have an inbound queue — + // new turns can only be pushed via the same HTTP POST that just + // returned busy. So cron fires (and any A2A retry) bounce 503 + // every tick until the SDK voluntarily yields. Reno Stars #1684 + // observed 12 consecutive `*/30` cron fires lost over 6h while a + // single native_session held the slot. // - // For native_session targets, return 503 + Retry-After directly. - // The caller's adapter handles retry on its own schedule, and - // the SDK's own queue absorbs the in-flight request when it does. - // Observability is preserved: logA2AFailure already ran above; - // activity_logs records the busy event; the broadcaster fires. - if runtimeOverrides.HasCapability(workspaceID, "session") { - log.Printf("ProxyA2A: target %s busy and declares native_session — skip enqueue, return 503", workspaceID) - return 0, nil, &proxyA2AError{ - Status: http.StatusServiceUnavailable, - Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)}, - Response: gin.H{ - "error": "workspace agent busy — adapter handles retry (native_session)", - "busy": true, - "retry_after": busyRetryAfterSeconds, - "native_session": true, - }, - } - } - + // The original concern — "drain timing has no relationship to SDK + // readiness" — turns out to be unfounded: heartbeat→drain is + // gated by `payload.ActiveTasks < maxConcurrent` in + // registry.go:Heartbeat, so drain only fires when the workspace + // itself reports spare capacity. That IS the session-ended + // signal. The native_session SDK reports ActiveTasks=1 while in a + // turn, ActiveTasks=0 when idle, and the next heartbeat after + // idle triggers DrainQueueForWorkspace. + // + // So we collapse the two branches: both native_session and + // non-native callers enqueue here. The native_session SDK's own + // in-flight POST stays unaffected; the queued item drains on the + // next post-idle heartbeat. idempotencyKey := extractIdempotencyKey(body) // Honor params.expires_in_seconds when the caller specifies one. Zero // (the unset default) → expiresAt = nil → infinite TTL preserved by diff --git a/workspace-server/internal/handlers/native_session_test.go b/workspace-server/internal/handlers/native_session_test.go index 2f70d882a..e043a85b3 100644 --- a/workspace-server/internal/handlers/native_session_test.go +++ b/workspace-server/internal/handlers/native_session_test.go @@ -6,18 +6,26 @@ import ( "testing" ) -// TestHandleA2ADispatchError_NativeSession_SkipsEnqueue validates capability -// primitive #5: when the target workspace has declared -// provides_native_session=True, a busy-shaped dispatch error MUST short- -// circuit straight to 503 + Retry-After. The platform's a2a_queue is -// skipped because the SDK owns its own queue/session state — double- -// buffering would cause spurious dispatches when the SDK is still busy. +// TestHandleA2ADispatchError_NativeSession_NowEnqueues validates the #1684 +// fix: native_session adapters used to short-circuit to 503-no-queue here, +// on the assumption that the SDK owned an inbound queue. In practice the +// common native_session SDKs (claude-agent-sdk, codex app-server, hermes) +// don't — new turns arrive only via the same HTTP POST that returns busy. +// So cron fires bounced 503 every tick until the SDK voluntarily yielded; +// Reno Stars #1684 observed 12 consecutive `*/30` cron fires lost over 6h. // -// Pin via sqlmock: we deliberately do NOT expect any INSERT INTO a2a_queue. -// If a future refactor re-introduces enqueueing under native_session, -// sqlmock fails the test on the unexpected query. -func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { - setupTestDB(t) +// Post-fix: native_session and non-native both enqueue. Drain timing is +// gated by registry.go:Heartbeat (`payload.ActiveTasks < maxConcurrent`) +// so the queued item only dispatches when the SDK reports spare capacity +// — i.e. the next heartbeat after the in-flight turn returns. +// +// This test pins the new behavior: native_session capability DOES NOT +// bypass EnqueueA2A. We expect the INSERT INTO a2a_queue query to fire, +// here arranged to fail so we can observe the legacy 503 fallback (and +// thereby confirm the INSERT was attempted; sqlmock fails the test if +// the expected query never runs). +func TestHandleA2ADispatchError_NativeSession_NowEnqueues(t *testing.T) { + mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) @@ -25,10 +33,15 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { runtimeOverrides.SetCapabilities("ws-native", map[string]bool{"session": true}) defer runtimeOverrides.Reset() - // DeadlineExceeded triggers isUpstreamBusyError. Without the native - // gate, this would fire EnqueueA2A → INSERT INTO a2a_queue. With - // the gate, it short-circuits to 503. We expect ZERO queue queries; - // sqlmock's ExpectationsWereMet implicitly enforces that on teardown. + // We now EXPECT the INSERT to fire even with native_session=true. Make + // it fail so the handler falls through to the legacy 503 path — that + // lets us assert (1) enqueue was attempted, (2) the response on + // queue-failure does NOT carry native_session=true marker (that field + // was removed alongside the gate). + mock.ExpectQuery(`INSERT INTO a2a_queue`). + WithArgs("ws-native", nil, PriorityTask, "{}", "message/send", nil). + WillReturnError(errTestQueueUnavailable) + _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-native", "", []byte("{}"), "message/send", context.DeadlineExceeded, 1, false, @@ -37,28 +50,27 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { t.Fatal("expected proxy error, got nil") } if perr.Status != http.StatusServiceUnavailable { - t.Errorf("got status %d, want 503 (native_session bypasses queue but still 503s)", perr.Status) + t.Errorf("got status %d, want 503 (enqueue failed → legacy 503 fallback)", perr.Status) } if perr.Headers["Retry-After"] == "" { - t.Error("expected Retry-After header on native-session 503") + t.Error("expected Retry-After header on busy-503") } - // Pin the marker so callers' adapters can distinguish this from a - // queue-failure 503: the body has native_session=true. - if got, _ := perr.Response["native_session"].(bool); !got { - t.Errorf("expected native_session=true in response body; got %+v", perr.Response) + // The native_session marker was removed from the response body — the + // platform queues both kinds now, callers no longer distinguish. Pin + // its absence so a future revert is caught. + if got, ok := perr.Response["native_session"].(bool); ok && got { + t.Errorf("native_session marker should be gone after #1684 fix; got %+v", perr.Response) } - // And busy=true stays so existing busy-handling code paths still trigger. if got, _ := perr.Response["busy"].(bool); !got { - t.Errorf("expected busy=true in response body; got %+v", perr.Response) + t.Errorf("expected busy=true; got %+v", perr.Response) } } -// TestHandleA2ADispatchError_NoNativeSession_StillEnqueues is the negative -// pin: a workspace WITHOUT the capability flag falls through to the -// existing EnqueueA2A path (and 503 if that fails). Same shape as -// TestHandleA2ADispatchError_ContextDeadline; we duplicate it here so -// the native_session gate change is bracketed by both positive and -// negative tests in the same file. +// TestHandleA2ADispatchError_NoNativeSession_StillEnqueues — non-native +// behavior is unchanged: enqueue is attempted, fail-fallback to 503. This +// negative pin guards against accidentally reverting the unification by +// re-introducing a `if HasCapability(...)` gate that would short-circuit +// the enqueue path. func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) @@ -79,13 +91,11 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) { if perr == nil { t.Fatal("expected proxy error, got nil") } - // Queue insert failed → falls through to legacy 503 (without - // native_session marker). if perr.Status != http.StatusServiceUnavailable { t.Errorf("got status %d, want 503", perr.Status) } if got, _ := perr.Response["native_session"].(bool); got { - t.Errorf("non-native workspace should NOT carry native_session=true in response; got %+v", perr.Response) + t.Errorf("non-native workspace should NOT carry native_session=true; got %+v", perr.Response) } } -- 2.52.0