Compare commits

...

1 Commits

Author SHA1 Message Date
hongming 8603f17c30 fix(scheduler): #1684 — native_session adapters now use platform a2a_queue (unblock Reno Stars cron starvation)
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 6s
CI / Python Lint & Test (pull_request) Successful in 6s
CI / Detect changes (pull_request) Successful in 9s
E2E Chat / detect-changes (pull_request) Successful in 13s
E2E API Smoke Test / detect-changes (pull_request) Successful in 13s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 11s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 9s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 10s
Harness Replays / detect-changes (pull_request) Successful in 12s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 10s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 11s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 6s
sop-checklist / review-refire (pull_request) Has been skipped
gate-check-v3 / gate-check (pull_request) Successful in 12s
qa-review / approved (pull_request) Failing after 12s
security-review / approved (pull_request) Failing after 10s
sop-tier-check / tier-check (pull_request) Successful in 4s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m12s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
CI / Canvas (Next.js) (pull_request) Successful in 10s
E2E Chat / E2E Chat (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 5s
Harness Replays / Harness Replays (pull_request) Successful in 3s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m47s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m21s
CI / Platform (Go) (pull_request) Successful in 5m17s
CI / all-required (pull_request) Successful in 5m59s
audit-force-merge / audit (pull_request) Successful in 6s
Pre-fix, `handleA2ADispatchError` short-circuited to 503-no-queue when
the target adapter declared `provides_native_session=True`. The rationale
(retained in the original code-comment) was that the SDK owned an inbound
queue and platform-side enqueueing would double-buffer with no clean
drain-readiness signal.

In production this assumption fails for the common native_session SDKs
(claude-agent-sdk, codex app-server, hermes-agent): they have no inbound
queue — new turns can only arrive via the same HTTP POST that just
returned busy. So cron fires (and any A2A retry) bounce 503 every tick
until the SDK voluntarily yields. Reno Stars #1684 observed 12
consecutive `*/30` fires lost over 6h while a single native_session held
the slot.

The "no clean drain-readiness signal" concern turns out to be unfounded:
`registry.go:Heartbeat` already gates drain by `payload.ActiveTasks <
maxConcurrent`, so `DrainQueueForWorkspace` only fires when the SDK
itself reports spare capacity. That IS the post-session-end signal: the
native_session SDK reports ActiveTasks=1 while in a turn, 0 when idle,
and the next heartbeat after idle triggers drain. The platform queue's
drain timing IS tied to SDK readiness — the original comment was wrong.

This change collapses the two branches into one: both native_session and
non-native callers now enqueue here. The native_session SDK's own
in-flight POST stays unaffected; the queued item drains on the next
post-idle heartbeat. The `native_session=true` marker is dropped from
the 503 response body since callers no longer need to distinguish (the
platform queues both kinds).

- a2a_proxy_helpers.go: remove the `if HasCapability(workspaceID,
  "session")` early-return; rewrite the comment to record the rationale
  for future readers
- native_session_test.go: invert the existing positive pin
  (TestHandleA2ADispatchError_NativeSession_SkipsEnqueue →
  _NowEnqueues), keep the negative pin (non-native still enqueues)

Refs: #1684, Reno Stars production-client report

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 12:15:14 -07:00
2 changed files with 65 additions and 60 deletions
@@ -71,35 +71,30 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
// with 202 status here was the original cycle 53 bug — callers saw
// proxyErr != nil and logged "delegation failed: proxy a2a error".
if isUpstreamBusyError(err) {
// Capability primitive #5 — see project memory
// `project_runtime_native_pluggable.md`. When the target workspace's
// adapter has declared provides_native_session=True, the SDK
// owns its own queue/session state (claude-agent-sdk's streaming
// session, hermes-agent's in-container event log, etc.). Adding
// the platform's a2a_queue layer on top would double-buffer the
// same in-flight state — and worse, the platform queue's drain
// timing has no relationship to the SDK's actual readiness, so
// the queued request might dispatch while the SDK is STILL busy.
// #1684 / Reno Stars: native_session adapters previously took a
// 503-no-enqueue path here, on the assumption that the SDK owned
// an inbound queue and the platform a2a_queue would double-buffer.
// In practice, the common native_session SDKs (claude-agent-sdk,
// codex app-server, hermes-agent) do NOT have an inbound queue —
// new turns can only be pushed via the same HTTP POST that just
// returned busy. So cron fires (and any A2A retry) bounce 503
// every tick until the SDK voluntarily yields. Reno Stars #1684
// observed 12 consecutive `*/30` cron fires lost over 6h while a
// single native_session held the slot.
//
// For native_session targets, return 503 + Retry-After directly.
// The caller's adapter handles retry on its own schedule, and
// the SDK's own queue absorbs the in-flight request when it does.
// Observability is preserved: logA2AFailure already ran above;
// activity_logs records the busy event; the broadcaster fires.
if runtimeOverrides.HasCapability(workspaceID, "session") {
log.Printf("ProxyA2A: target %s busy and declares native_session — skip enqueue, return 503", workspaceID)
return 0, nil, &proxyA2AError{
Status: http.StatusServiceUnavailable,
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
Response: gin.H{
"error": "workspace agent busy — adapter handles retry (native_session)",
"busy": true,
"retry_after": busyRetryAfterSeconds,
"native_session": true,
},
}
}
// The original concern — "drain timing has no relationship to SDK
// readiness" — turns out to be unfounded: heartbeat→drain is
// gated by `payload.ActiveTasks < maxConcurrent` in
// registry.go:Heartbeat, so drain only fires when the workspace
// itself reports spare capacity. That IS the session-ended
// signal. The native_session SDK reports ActiveTasks=1 while in a
// turn, ActiveTasks=0 when idle, and the next heartbeat after
// idle triggers DrainQueueForWorkspace.
//
// So we collapse the two branches: both native_session and
// non-native callers enqueue here. The native_session SDK's own
// in-flight POST stays unaffected; the queued item drains on the
// next post-idle heartbeat.
idempotencyKey := extractIdempotencyKey(body)
// Honor params.expires_in_seconds when the caller specifies one. Zero
// (the unset default) → expiresAt = nil → infinite TTL preserved by
@@ -6,18 +6,26 @@ import (
"testing"
)
// TestHandleA2ADispatchError_NativeSession_SkipsEnqueue validates capability
// primitive #5: when the target workspace has declared
// provides_native_session=True, a busy-shaped dispatch error MUST short-
// circuit straight to 503 + Retry-After. The platform's a2a_queue is
// skipped because the SDK owns its own queue/session state — double-
// buffering would cause spurious dispatches when the SDK is still busy.
// TestHandleA2ADispatchError_NativeSession_NowEnqueues validates the #1684
// fix: native_session adapters used to short-circuit to 503-no-queue here,
// on the assumption that the SDK owned an inbound queue. In practice the
// common native_session SDKs (claude-agent-sdk, codex app-server, hermes)
// don't — new turns arrive only via the same HTTP POST that returns busy.
// So cron fires bounced 503 every tick until the SDK voluntarily yielded;
// Reno Stars #1684 observed 12 consecutive `*/30` cron fires lost over 6h.
//
// Pin via sqlmock: we deliberately do NOT expect any INSERT INTO a2a_queue.
// If a future refactor re-introduces enqueueing under native_session,
// sqlmock fails the test on the unexpected query.
func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) {
setupTestDB(t)
// Post-fix: native_session and non-native both enqueue. Drain timing is
// gated by registry.go:Heartbeat (`payload.ActiveTasks < maxConcurrent`)
// so the queued item only dispatches when the SDK reports spare capacity
// — i.e. the next heartbeat after the in-flight turn returns.
//
// This test pins the new behavior: native_session capability DOES NOT
// bypass EnqueueA2A. We expect the INSERT INTO a2a_queue query to fire,
// here arranged to fail so we can observe the legacy 503 fallback (and
// thereby confirm the INSERT was attempted; sqlmock fails the test if
// the expected query never runs).
func TestHandleA2ADispatchError_NativeSession_NowEnqueues(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
@@ -25,10 +33,15 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) {
runtimeOverrides.SetCapabilities("ws-native", map[string]bool{"session": true})
defer runtimeOverrides.Reset()
// DeadlineExceeded triggers isUpstreamBusyError. Without the native
// gate, this would fire EnqueueA2A → INSERT INTO a2a_queue. With
// the gate, it short-circuits to 503. We expect ZERO queue queries;
// sqlmock's ExpectationsWereMet implicitly enforces that on teardown.
// We now EXPECT the INSERT to fire even with native_session=true. Make
// it fail so the handler falls through to the legacy 503 path — that
// lets us assert (1) enqueue was attempted, (2) the response on
// queue-failure does NOT carry native_session=true marker (that field
// was removed alongside the gate).
mock.ExpectQuery(`INSERT INTO a2a_queue`).
WithArgs("ws-native", nil, PriorityTask, "{}", "message/send", nil).
WillReturnError(errTestQueueUnavailable)
_, _, perr := handler.handleA2ADispatchError(
context.Background(), "ws-native", "", []byte("{}"), "message/send",
context.DeadlineExceeded, 1, false,
@@ -37,28 +50,27 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) {
t.Fatal("expected proxy error, got nil")
}
if perr.Status != http.StatusServiceUnavailable {
t.Errorf("got status %d, want 503 (native_session bypasses queue but still 503s)", perr.Status)
t.Errorf("got status %d, want 503 (enqueue failed → legacy 503 fallback)", perr.Status)
}
if perr.Headers["Retry-After"] == "" {
t.Error("expected Retry-After header on native-session 503")
t.Error("expected Retry-After header on busy-503")
}
// Pin the marker so callers' adapters can distinguish this from a
// queue-failure 503: the body has native_session=true.
if got, _ := perr.Response["native_session"].(bool); !got {
t.Errorf("expected native_session=true in response body; got %+v", perr.Response)
// The native_session marker was removed from the response body — the
// platform queues both kinds now, callers no longer distinguish. Pin
// its absence so a future revert is caught.
if got, ok := perr.Response["native_session"].(bool); ok && got {
t.Errorf("native_session marker should be gone after #1684 fix; got %+v", perr.Response)
}
// And busy=true stays so existing busy-handling code paths still trigger.
if got, _ := perr.Response["busy"].(bool); !got {
t.Errorf("expected busy=true in response body; got %+v", perr.Response)
t.Errorf("expected busy=true; got %+v", perr.Response)
}
}
// TestHandleA2ADispatchError_NoNativeSession_StillEnqueues is the negative
// pin: a workspace WITHOUT the capability flag falls through to the
// existing EnqueueA2A path (and 503 if that fails). Same shape as
// TestHandleA2ADispatchError_ContextDeadline; we duplicate it here so
// the native_session gate change is bracketed by both positive and
// negative tests in the same file.
// TestHandleA2ADispatchError_NoNativeSession_StillEnqueues — non-native
// behavior is unchanged: enqueue is attempted, fail-fallback to 503. This
// negative pin guards against accidentally reverting the unification by
// re-introducing a `if HasCapability(...)` gate that would short-circuit
// the enqueue path.
func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
@@ -79,13 +91,11 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) {
if perr == nil {
t.Fatal("expected proxy error, got nil")
}
// Queue insert failed → falls through to legacy 503 (without
// native_session marker).
if perr.Status != http.StatusServiceUnavailable {
t.Errorf("got status %d, want 503", perr.Status)
}
if got, _ := perr.Response["native_session"].(bool); got {
t.Errorf("non-native workspace should NOT carry native_session=true in response; got %+v", perr.Response)
t.Errorf("non-native workspace should NOT carry native_session=true; got %+v", perr.Response)
}
}