fix(scheduler): #1696 — detect SDK-layer errors inside HTTP 200 responses (#1699)
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Waiting to run
Block internal-flavored paths / Block forbidden paths (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 9s
CI / Detect changes (push) Successful in 12s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 15s
SECRET_PATTERNS drift lint / Detect SECRET_PATTERNS drift (push) Successful in 55s
E2E Chat / detect-changes (push) Successful in 17s
E2E API Smoke Test / detect-changes (push) Successful in 17s
Handlers Postgres Integration / detect-changes (push) Successful in 10s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 6s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m27s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 8s
Harness Replays / detect-changes (push) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 5s
CI / Canvas (Next.js) (push) Successful in 4s
CI / Shellcheck (E2E scripts) (push) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 1m48s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 6s
Harness Replays / Harness Replays (push) Successful in 13s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 2m18s
publish-workspace-server-image / build-and-push (push) Successful in 5m11s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m7s
E2E Chat / E2E Chat (push) Successful in 3m58s
CI / all-required (push) Successful in 6m29s
CI / Platform (Go) (push) Successful in 5m42s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 5s
publish-workspace-server-image / Production auto-deploy (push) Successful in 3m12s
lint-bp-context-emit-match / lint-bp-context-emit-match (push) Successful in 1m18s
main-red-watchdog / watchdog (push) Successful in 46s
Sweep stale Cloudflare DNS records / Sweep CF orphans (push) Successful in 15s
gate-check-v3 / gate-check (push) Successful in 45s
ci-required-drift / drift (push) Successful in 1m4s
Sweep stale AWS Secrets Manager secrets / Sweep AWS Secrets Manager (push) Successful in 6s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 7m23s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 8s

Co-authored-by: agent-dev-b <agent-dev-b@agents.moleculesai.app>
Co-committed-by: agent-dev-b <agent-dev-b@agents.moleculesai.app>
This commit was merged in pull request #1699.
This commit is contained in:
2026-05-23 03:19:34 +00:00
committed by hongming
parent b6373e7026
commit 1df028f05b
4 changed files with 445 additions and 2 deletions
@@ -425,6 +425,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
lastStatus := "ok"
lastError := ""
resultKind := ""
if proxyErr != nil {
lastStatus = "error"
lastError = fmt.Sprintf("%v", proxyErr)
@@ -438,7 +439,21 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
lastError = fmt.Sprintf("A2A adapter error: %s", a2aErr)
log.Printf("Scheduler: '%s' A2A adapter error (HTTP %d): %s", sched.Name, statusCode, a2aErr)
} else {
log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode)
// HTTP 200 — inspect response body for SDK-layer errors.
// The claude-code-sdk adapter returns HTTP 200 even when the inner
// LLM call throws (e.g. Max-plan rate-limit, quota exhaustion, SDK
// internal errors). Without this check those failures surface as
// "completed (HTTP 200)" in last_status while the agent chat shows
// errors — a silent failure that hides schedule outages.
// See: #1696.
resultKind = detectResultKind(respBody)
if resultKind != "" && resultKind != "ok" {
lastStatus = resultKind
lastError = fmt.Sprintf("SDK error: result_kind=%s", resultKind)
log.Printf("Scheduler: '%s' SDK error detected — result_kind=%s", sched.Name, resultKind)
} else {
log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode)
}
}
// #795: detect phantom-producing schedules — cron fires successfully
@@ -483,6 +498,54 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
resetCancel()
}
// #1696: track consecutive SDK errors. When the adapter returns HTTP 200
// but the response body signals a non-ok result_kind (rate_limited,
// sdk_error, quota_exhausted), we increment a counter. After 3 consecutive
// SDK errors we auto-disable the schedule and log it — the schedule is
// suffering a persistent LLM-layer failure and firing it again will keep
// producing the same errors while burning tokens.
//
// Only apply when the current lastStatus is a non-ok resultKind (not when
// we already have 'error' from proxyErr or non-2xx HTTP status — those have
// their own failure semantics). Also skip when lastStatus is 'stale' (the
// empty-response escalation path takes priority).
var consecSDK int
if resultKind != "" && resultKind != "ok" {
sdkCtx, sdkCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if err := db.DB.QueryRowContext(sdkCtx, `
UPDATE workspace_schedules
SET consecutive_sdk_errors = consecutive_sdk_errors + 1,
updated_at = now()
WHERE id = $1
RETURNING consecutive_sdk_errors`, sched.ID).Scan(&consecSDK); err != nil {
log.Printf("Scheduler: '%s' SDK-error bump failed: %v", sched.Name, err)
}
sdkCancel()
if consecSDK >= 3 {
log.Printf("Scheduler: '%s' AUTO-DISABLING after %d consecutive SDK errors (workspace %s)",
sched.Name, consecSDK, short(sched.WorkspaceID, 12))
autoDisableCtx, autoDisableCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(autoDisableCtx, `
UPDATE workspace_schedules SET enabled = false, updated_at = now() WHERE id = $1 AND enabled = true`,
sched.ID)
autoDisableCancel()
}
} else {
// Non-SDK-error run — reset the counter.
// Guard: only reset when lastStatus is a clean ok (not 'stale', not
// 'error', not resultKind). An 'ok' resultKind means the SDK is fine
// and we should clear the streak.
if lastStatus == "ok" {
resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
_, _ = db.DB.ExecContext(resetCtx, `
UPDATE workspace_schedules
SET consecutive_sdk_errors = 0,
updated_at = now()
WHERE id = $1`, sched.ID)
resetCancel()
}
}
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
var nextRunPtr *time.Time
if nextErr == nil {
@@ -763,6 +826,73 @@ func (s *Scheduler) sweepPhantomBusy(ctx context.Context) {
}
}
// detectResultKind inspects an A2A response body for SDK-layer error signals
// that are invisible at the HTTP level. The claude-code-sdk adapter returns
// HTTP 200 even when the inner LLM call throws (Max-plan rate-limit, quota
// exhaustion, SDK internal errors) — the error surfaces only in the response
// body under result.kind or result.result_kind.
//
// Returns an empty string when the response is clean (result_kind is "ok" or
// absent). Returns the result_kind value when it is a non-ok signal, so callers
// can propagate it as the schedule's last_status.
//
// Known non-ok kinds:
// - "rate_limited" — LLM API rate-limit hit (Max-plan, etc.)
// - "quota_exhausted" — quota / budget exhausted
// - "sdk_error" — SDK threw an internal error
//
// See #1696.
func detectResultKind(body []byte) string {
if len(body) == 0 {
return ""
}
var top map[string]json.RawMessage
if err := json.Unmarshal(body, &top); err != nil {
return ""
}
// Check result.kind first (canonical JSON-RPC shape).
if rawResult, ok := top["result"]; ok {
var result map[string]json.RawMessage
if err := json.Unmarshal(rawResult, &result); err == nil {
// result.kind (canonical JSON-RPC error envelope field).
if rawKind, ok := result["kind"]; ok {
var k string
if json.Unmarshal(rawKind, &k) == nil && k != "" && k != "ok" {
return k
}
}
// result.result_kind (legacy / alternative field name).
if rawKind, ok := result["result_kind"]; ok {
var k string
if json.Unmarshal(rawKind, &k) == nil && k != "" && k != "ok" {
return k
}
}
}
}
// Top-level error: non-ok HTTP 200 with a structured error in the body.
if rawErr, ok := top["error"]; ok {
var errMsg string
if err := json.Unmarshal(rawErr, &errMsg); err == nil && errMsg != "" {
// Distinguish SDK errors from other errors. SDK-layer errors from the
// Claude Code runtime include specific markers.
lower := strings.ToLower(errMsg)
// Check more specific patterns first (max-plan quota > general rate).
if strings.Contains(lower, "max-plan") || strings.Contains(lower, "quota") || strings.Contains(lower, "budget") {
return "quota_exhausted"
}
if strings.Contains(lower, "rate limit") || strings.Contains(lower, "rate_limit") {
return "rate_limited"
}
if strings.Contains(lower, "claude code returned an error") || strings.Contains(lower, "sdk error") ||
strings.Contains(lower, "api key") || strings.Contains(lower, "authentication") {
return "sdk_error"
}
}
}
return ""
}
// isEmptyResponse checks if an A2A response body indicates the agent
// produced no meaningful output. Catches "(no response generated)" from
// the workspace runtime + genuinely empty/null responses. Used by the
@@ -3,6 +3,7 @@ package scheduler
import (
"context"
"database/sql"
"encoding/json"
"testing"
"time"
"unicode/utf8"
@@ -337,6 +338,12 @@ func TestFireSchedule_ComputeNextRunError(t *testing.T) {
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// #1696 consecutive_sdk_errors reset — successProxy has no result_kind,
// so detectResultKind returns "" and lastStatus="ok" → reset.
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// UPDATE must fire — COALESCE($2, next_run_at) keeps existing value when $2 is nil.
// AnyArg for $2 because it will be nil (ComputeNextRun failed).
mock.ExpectExec(`UPDATE workspace_schedules`).
@@ -592,7 +599,14 @@ func TestFireSchedule_NormalSuccess_AdvancesNextRunAt(t *testing.T) {
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// 3. Normal UPDATE after successful proxy call.
// 3. #1696 consecutive_sdk_errors reset — successProxy response has no
// result_kind in the body, so detectResultKind returns "" and lastStatus
// is "ok" → we hit the SDK-error counter reset branch.
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// 4. Normal UPDATE after successful proxy call.
// Args: $1=sched.ID, $2=nextRunPtr (computed time), $3=lastStatus, $4=lastError
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), "ok", "").
@@ -655,6 +669,281 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) {
}
// trigger CI
// ── TestDetectResultKind ───────────────────────────────────────────────────────
// TestDetectResultKind covers the SDK error detection path: HTTP 200 responses
// with non-ok result_kind in the body must be recognised and returned as the
// kind string, not silently treated as ok.
func TestDetectResultKind(t *testing.T) {
// The test exercises detectResultKind directly so we don't need a full
// fireSchedule mock for this unit-test level.
tests := []struct {
name string
body string
wantKind string
}{
{
name: "clean ok response — empty body",
body: `{}`,
wantKind: "",
},
{
name: "clean ok response — result.kind absent",
body: `{"result":{"parts":[{"text":"hello"}]}}`,
wantKind: "",
},
{
name: "clean ok response — result.kind=ok",
body: `{"result":{"kind":"ok","parts":[{"text":"hello"}]}}`,
wantKind: "",
},
{
name: "clean ok response — result.result_kind=ok",
body: `{"result":{"result_kind":"ok","parts":[{"text":"hello"}]}}`,
wantKind: "",
},
{
name: "SDK error — result.kind=rate_limited",
body: `{"result":{"kind":"rate_limited","parts":[{"text":"error"}]}}`,
wantKind: "rate_limited",
},
{
name: "SDK error — result.kind=quota_exhausted",
body: `{"result":{"kind":"quota_exhausted"}}`,
wantKind: "quota_exhausted",
},
{
name: "SDK error — result.kind=sdk_error",
body: `{"result":{"kind":"sdk_error"}}`,
wantKind: "sdk_error",
},
{
name: "SDK error — result.result_kind=rate_limited",
body: `{"result":{"result_kind":"rate_limited"}}`,
wantKind: "rate_limited",
},
{
name: "SDK error — error string with rate limit",
body: `{"result":{"parts":[]},"error":"An error occurred: rate limit exceeded"}`,
wantKind: "rate_limited",
},
{
name: "SDK error — error string with max-plan",
body: `{"error":"Max-plan rate limit reached"}`,
wantKind: "quota_exhausted",
},
{
name: "SDK error — error string with quota",
body: `{"error":"quota exhausted for model"}`,
wantKind: "quota_exhausted",
},
{
name: "SDK error — error string with sdk error",
body: `{"error":"Claude Code returned an error result: success"}`,
wantKind: "sdk_error",
},
{
name: "SDK error — error string with api key",
body: `{"error":"invalid API key"}`,
wantKind: "sdk_error",
},
{
name: "unknown error string — not an SDK error",
body: `{"error":"something went wrong"}`,
wantKind: "",
},
{
name: "empty response body",
body: ``,
wantKind: "",
},
{
name: "malformed JSON",
body: `not valid json`,
wantKind: "",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := detectResultKind([]byte(tc.body))
if got != tc.wantKind {
t.Errorf("detectResultKind(%q) = %q, want %q", tc.body, got, tc.wantKind)
}
})
}
}
// ── TestFireSchedule_SDKError_RateLimited (#1696) ───────────────────────────────
//
// When ProxyA2ARequest returns HTTP 200 but the response body contains a
// non-ok result_kind, fireSchedule must:
// 1. Set last_status to the result_kind (not 'ok').
// 2. Set last_error to describe the SDK error.
// 3. Increment consecutive_sdk_errors.
// 4. NOT auto-disable on first occurrence (threshold is 3).
//
// This test uses an sdkErrorProxy that returns a rate-limited body and asserts
// the first run is recorded as 'rate_limited' with consecutive_sdk_errors=1
// and enabled=true.
func TestFireSchedule_SDKError_RateLimited(t *testing.T) {
mock := setupTestDB(t)
sched := scheduleRow{
ID: "sdk1-test-sched-0001",
WorkspaceID: "sdk1-test-workspace1",
Name: "rate-limited-job",
CronExpr: "0 * * * *",
Timezone: "UTC",
Prompt: "do work",
}
// 1. active_tasks check → workspace idle
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// 2. #1696 consecutive_sdk_errors bump — RETURNING gives us count=1.
// Use ExpectQuery (not Exec) because QueryRowContext + RETURNING
// produces a result set consumed via .Scan().
mock.ExpectQuery(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnRows(sqlmock.NewRows([]string{"consecutive_sdk_errors"}).AddRow(1))
// 3. Post-fire UPDATE — last_status='rate_limited', last_error='SDK error: result_kind=rate_limited'
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// 4. activity_logs INSERT
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
s := New(&sdkErrorProxy{kind: "rate_limited"}, nil)
s.fireSchedule(context.Background(), sched)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations for SDK-error first run: %v", err)
}
}
// ── TestFireSchedule_SDKError_AutoDisableOnThirdConsecutive (#1696) ───────────
//
// On the 3rd consecutive SDK error, fireSchedule must auto-disable the
// schedule (enabled=false) in addition to recording the error status.
// Threshold is 3 per #1696 requirement.
func TestFireSchedule_SDKError_AutoDisableOnThirdConsecutive(t *testing.T) {
mock := setupTestDB(t)
sched := scheduleRow{
ID: "sdk2-test-sched-0002",
WorkspaceID: "sdk2-test-workspace2",
Name: "auto-disable-job",
CronExpr: "0 * * * *",
Timezone: "UTC",
Prompt: "do work",
}
// 1. active_tasks check → workspace idle
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// 2. #1696 consecutive_sdk_errors bump — RETURNING gives count=3 (threshold met).
// Use ExpectQuery (not Exec) because QueryRowContext + RETURNING
// produces a result set consumed via .Scan().
mock.ExpectQuery(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnRows(sqlmock.NewRows([]string{"consecutive_sdk_errors"}).AddRow(3))
// 3. Auto-disable UPDATE — sets enabled=false (schedule has hit 3rd SDK error)
mock.ExpectExec(`UPDATE workspace_schedules SET enabled`).
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// 4. Post-fire UPDATE
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// 5. activity_logs INSERT
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
s := New(&sdkErrorProxy{kind: "rate_limited"}, nil)
s.fireSchedule(context.Background(), sched)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations for SDK-error auto-disable: %v", err)
}
}
// ── TestFireSchedule_SDKError_CounterResetOnCleanRun (#1696) ──────────────────
//
// A clean HTTP-200 run (no SDK error) must reset consecutive_sdk_errors to 0.
// This prevents false auto-disable after intermittent SDK blips.
func TestFireSchedule_SDKError_CounterResetOnCleanRun(t *testing.T) {
mock := setupTestDB(t)
sched := scheduleRow{
ID: "sdk3-test-sched-0003",
WorkspaceID: "sdk3-test-workspace3",
Name: "clean-reset-job",
CronExpr: "30 * * * *",
Timezone: "UTC",
Prompt: "do work",
}
// 1. active_tasks check → workspace idle
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// 2. No SDK error — #1696 counter is reset to 0
// (lastStatus is 'ok', resultKind is empty, so we go to the reset branch)
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// 3. Post-fire UPDATE
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), "ok", "").
WillReturnResult(sqlmock.NewResult(0, 1))
// 4. activity_logs INSERT
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", "").
WillReturnResult(sqlmock.NewResult(0, 1))
s := New(&successProxy{}, nil)
s.fireSchedule(context.Background(), sched)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations for SDK-error counter reset: %v", err)
}
}
// ── sdkErrorProxy ──────────────────────────────────────────────────────────────
//
// sdkErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200 but
// embeds a non-ok result_kind in the response body, simulating a Claude Code
// SDK that returned 200 but the inner LLM call threw a rate-limit / quota error.
// Used by TestFireSchedule_SDKError_* to cover #1696 SDK error detection.
type sdkErrorProxy struct {
kind string // result_kind value to embed in the response body
}
func (p *sdkErrorProxy) ProxyA2ARequest(
_ context.Context, _ string, _ []byte, _ string, _ bool,
) (int, []byte, error) {
body, _ := json.Marshal(map[string]interface{}{
"result": map[string]interface{}{
"kind": p.kind,
"parts": []map[string]interface{}{{"kind": "text", "text": "(no response generated)"}},
},
})
return 200, body, nil
}
// ── TestTruncate_utf8Safe_regression2026 ──────────────────────────────────────
// TestTruncate_utf8Safe_regression2026 locks in the #2026 fix: truncate must
@@ -0,0 +1,4 @@
-- migration: 20260523000000_schedule_consecutive_sdk_errors.down.sql
-- Reverts #1696 fix for #1696 (consecutive_sdk_errors column)
ALTER TABLE workspace_schedules DROP COLUMN IF EXISTS consecutive_sdk_errors;
@@ -0,0 +1,20 @@
-- migration: 20260523000000_schedule_consecutive_sdk_errors.up.sql
-- Fixes #1696: Add consecutive_sdk_errors counter to track SDK errors (HTTP 200
-- responses where the Claude Code runtime returned a non-ok result_kind).
-- When this counter reaches 3, the scheduler sets last_status='rate_limited'
-- and auto-disables the schedule.
--
-- The core issue: the claude-code-sdk adapter returns HTTP 200 even when the
-- inner LLM call throws (e.g. Max-plan rate-limit). All 3 observed runs logged
-- "completed (HTTP 200)" yet surfaced agent errors in the workspace chat.
-- This counter lets us detect that pattern and escalate appropriately.
ALTER TABLE workspace_schedules
ADD COLUMN IF NOT EXISTS consecutive_sdk_errors INTEGER NOT NULL DEFAULT 0;
COMMENT ON COLUMN workspace_schedules.consecutive_sdk_errors IS
'Count of consecutive scheduler fires where ProxyA2ARequest returned HTTP 200
but the response body contained a non-ok result_kind (e.g. rate_limited,
sdk_error, quota_exhausted). Reset to 0 on any non-SDK-error status.
After 3 consecutive SDK errors the schedule is auto-disabled with
status rate_limited. Fixes #1696.';