diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 5f944fc79..cd16f989d 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -71,35 +71,30 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // with 202 status here was the original cycle 53 bug — callers saw // proxyErr != nil and logged "delegation failed: proxy a2a error". if isUpstreamBusyError(err) { - // Capability primitive #5 — see project memory - // `project_runtime_native_pluggable.md`. When the target workspace's - // adapter has declared provides_native_session=True, the SDK - // owns its own queue/session state (claude-agent-sdk's streaming - // session, hermes-agent's in-container event log, etc.). Adding - // the platform's a2a_queue layer on top would double-buffer the - // same in-flight state — and worse, the platform queue's drain - // timing has no relationship to the SDK's actual readiness, so - // the queued request might dispatch while the SDK is STILL busy. + // #1684 / Reno Stars: native_session adapters previously took a + // 503-no-enqueue path here, on the assumption that the SDK owned + // an inbound queue and the platform a2a_queue would double-buffer. + // In practice, the common native_session SDKs (claude-agent-sdk, + // codex app-server, hermes-agent) do NOT have an inbound queue — + // new turns can only be pushed via the same HTTP POST that just + // returned busy. So cron fires (and any A2A retry) bounce 503 + // every tick until the SDK voluntarily yields. Reno Stars #1684 + // observed 12 consecutive `*/30` cron fires lost over 6h while a + // single native_session held the slot. // - // For native_session targets, return 503 + Retry-After directly. - // The caller's adapter handles retry on its own schedule, and - // the SDK's own queue absorbs the in-flight request when it does. - // Observability is preserved: logA2AFailure already ran above; - // activity_logs records the busy event; the broadcaster fires. - if runtimeOverrides.HasCapability(workspaceID, "session") { - log.Printf("ProxyA2A: target %s busy and declares native_session — skip enqueue, return 503", workspaceID) - return 0, nil, &proxyA2AError{ - Status: http.StatusServiceUnavailable, - Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)}, - Response: gin.H{ - "error": "workspace agent busy — adapter handles retry (native_session)", - "busy": true, - "retry_after": busyRetryAfterSeconds, - "native_session": true, - }, - } - } - + // The original concern — "drain timing has no relationship to SDK + // readiness" — turns out to be unfounded: heartbeat→drain is + // gated by `payload.ActiveTasks < maxConcurrent` in + // registry.go:Heartbeat, so drain only fires when the workspace + // itself reports spare capacity. That IS the session-ended + // signal. The native_session SDK reports ActiveTasks=1 while in a + // turn, ActiveTasks=0 when idle, and the next heartbeat after + // idle triggers DrainQueueForWorkspace. + // + // So we collapse the two branches: both native_session and + // non-native callers enqueue here. The native_session SDK's own + // in-flight POST stays unaffected; the queued item drains on the + // next post-idle heartbeat. idempotencyKey := extractIdempotencyKey(body) // Honor params.expires_in_seconds when the caller specifies one. Zero // (the unset default) → expiresAt = nil → infinite TTL preserved by diff --git a/workspace-server/internal/handlers/native_session_test.go b/workspace-server/internal/handlers/native_session_test.go index 2f70d882a..e043a85b3 100644 --- a/workspace-server/internal/handlers/native_session_test.go +++ b/workspace-server/internal/handlers/native_session_test.go @@ -6,18 +6,26 @@ import ( "testing" ) -// TestHandleA2ADispatchError_NativeSession_SkipsEnqueue validates capability -// primitive #5: when the target workspace has declared -// provides_native_session=True, a busy-shaped dispatch error MUST short- -// circuit straight to 503 + Retry-After. The platform's a2a_queue is -// skipped because the SDK owns its own queue/session state — double- -// buffering would cause spurious dispatches when the SDK is still busy. +// TestHandleA2ADispatchError_NativeSession_NowEnqueues validates the #1684 +// fix: native_session adapters used to short-circuit to 503-no-queue here, +// on the assumption that the SDK owned an inbound queue. In practice the +// common native_session SDKs (claude-agent-sdk, codex app-server, hermes) +// don't — new turns arrive only via the same HTTP POST that returns busy. +// So cron fires bounced 503 every tick until the SDK voluntarily yielded; +// Reno Stars #1684 observed 12 consecutive `*/30` cron fires lost over 6h. // -// Pin via sqlmock: we deliberately do NOT expect any INSERT INTO a2a_queue. -// If a future refactor re-introduces enqueueing under native_session, -// sqlmock fails the test on the unexpected query. -func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { - setupTestDB(t) +// Post-fix: native_session and non-native both enqueue. Drain timing is +// gated by registry.go:Heartbeat (`payload.ActiveTasks < maxConcurrent`) +// so the queued item only dispatches when the SDK reports spare capacity +// — i.e. the next heartbeat after the in-flight turn returns. +// +// This test pins the new behavior: native_session capability DOES NOT +// bypass EnqueueA2A. We expect the INSERT INTO a2a_queue query to fire, +// here arranged to fail so we can observe the legacy 503 fallback (and +// thereby confirm the INSERT was attempted; sqlmock fails the test if +// the expected query never runs). +func TestHandleA2ADispatchError_NativeSession_NowEnqueues(t *testing.T) { + mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) @@ -25,10 +33,15 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { runtimeOverrides.SetCapabilities("ws-native", map[string]bool{"session": true}) defer runtimeOverrides.Reset() - // DeadlineExceeded triggers isUpstreamBusyError. Without the native - // gate, this would fire EnqueueA2A → INSERT INTO a2a_queue. With - // the gate, it short-circuits to 503. We expect ZERO queue queries; - // sqlmock's ExpectationsWereMet implicitly enforces that on teardown. + // We now EXPECT the INSERT to fire even with native_session=true. Make + // it fail so the handler falls through to the legacy 503 path — that + // lets us assert (1) enqueue was attempted, (2) the response on + // queue-failure does NOT carry native_session=true marker (that field + // was removed alongside the gate). + mock.ExpectQuery(`INSERT INTO a2a_queue`). + WithArgs("ws-native", nil, PriorityTask, "{}", "message/send", nil). + WillReturnError(errTestQueueUnavailable) + _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-native", "", []byte("{}"), "message/send", context.DeadlineExceeded, 1, false, @@ -37,28 +50,27 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { t.Fatal("expected proxy error, got nil") } if perr.Status != http.StatusServiceUnavailable { - t.Errorf("got status %d, want 503 (native_session bypasses queue but still 503s)", perr.Status) + t.Errorf("got status %d, want 503 (enqueue failed → legacy 503 fallback)", perr.Status) } if perr.Headers["Retry-After"] == "" { - t.Error("expected Retry-After header on native-session 503") + t.Error("expected Retry-After header on busy-503") } - // Pin the marker so callers' adapters can distinguish this from a - // queue-failure 503: the body has native_session=true. - if got, _ := perr.Response["native_session"].(bool); !got { - t.Errorf("expected native_session=true in response body; got %+v", perr.Response) + // The native_session marker was removed from the response body — the + // platform queues both kinds now, callers no longer distinguish. Pin + // its absence so a future revert is caught. + if got, ok := perr.Response["native_session"].(bool); ok && got { + t.Errorf("native_session marker should be gone after #1684 fix; got %+v", perr.Response) } - // And busy=true stays so existing busy-handling code paths still trigger. if got, _ := perr.Response["busy"].(bool); !got { - t.Errorf("expected busy=true in response body; got %+v", perr.Response) + t.Errorf("expected busy=true; got %+v", perr.Response) } } -// TestHandleA2ADispatchError_NoNativeSession_StillEnqueues is the negative -// pin: a workspace WITHOUT the capability flag falls through to the -// existing EnqueueA2A path (and 503 if that fails). Same shape as -// TestHandleA2ADispatchError_ContextDeadline; we duplicate it here so -// the native_session gate change is bracketed by both positive and -// negative tests in the same file. +// TestHandleA2ADispatchError_NoNativeSession_StillEnqueues — non-native +// behavior is unchanged: enqueue is attempted, fail-fallback to 503. This +// negative pin guards against accidentally reverting the unification by +// re-introducing a `if HasCapability(...)` gate that would short-circuit +// the enqueue path. func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) @@ -79,13 +91,11 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) { if perr == nil { t.Fatal("expected proxy error, got nil") } - // Queue insert failed → falls through to legacy 503 (without - // native_session marker). if perr.Status != http.StatusServiceUnavailable { t.Errorf("got status %d, want 503", perr.Status) } if got, _ := perr.Response["native_session"].(bool); got { - t.Errorf("non-native workspace should NOT carry native_session=true in response; got %+v", perr.Response) + t.Errorf("non-native workspace should NOT carry native_session=true; got %+v", perr.Response) } }