diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 10e67208e..186a60be3 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -425,6 +425,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { lastStatus := "ok" lastError := "" + resultKind := "" if proxyErr != nil { lastStatus = "error" lastError = fmt.Sprintf("%v", proxyErr) @@ -438,7 +439,21 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { lastError = fmt.Sprintf("A2A adapter error: %s", a2aErr) log.Printf("Scheduler: '%s' A2A adapter error (HTTP %d): %s", sched.Name, statusCode, a2aErr) } else { - log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode) + // HTTP 200 — inspect response body for SDK-layer errors. + // The claude-code-sdk adapter returns HTTP 200 even when the inner + // LLM call throws (e.g. Max-plan rate-limit, quota exhaustion, SDK + // internal errors). Without this check those failures surface as + // "completed (HTTP 200)" in last_status while the agent chat shows + // errors — a silent failure that hides schedule outages. + // See: #1696. + resultKind = detectResultKind(respBody) + if resultKind != "" && resultKind != "ok" { + lastStatus = resultKind + lastError = fmt.Sprintf("SDK error: result_kind=%s", resultKind) + log.Printf("Scheduler: '%s' SDK error detected — result_kind=%s", sched.Name, resultKind) + } else { + log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode) + } } // #795: detect phantom-producing schedules — cron fires successfully @@ -483,6 +498,54 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { resetCancel() } + // #1696: track consecutive SDK errors. When the adapter returns HTTP 200 + // but the response body signals a non-ok result_kind (rate_limited, + // sdk_error, quota_exhausted), we increment a counter. After 3 consecutive + // SDK errors we auto-disable the schedule and log it — the schedule is + // suffering a persistent LLM-layer failure and firing it again will keep + // producing the same errors while burning tokens. + // + // Only apply when the current lastStatus is a non-ok resultKind (not when + // we already have 'error' from proxyErr or non-2xx HTTP status — those have + // their own failure semantics). Also skip when lastStatus is 'stale' (the + // empty-response escalation path takes priority). + var consecSDK int + if resultKind != "" && resultKind != "ok" { + sdkCtx, sdkCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if err := db.DB.QueryRowContext(sdkCtx, ` + UPDATE workspace_schedules + SET consecutive_sdk_errors = consecutive_sdk_errors + 1, + updated_at = now() + WHERE id = $1 + RETURNING consecutive_sdk_errors`, sched.ID).Scan(&consecSDK); err != nil { + log.Printf("Scheduler: '%s' SDK-error bump failed: %v", sched.Name, err) + } + sdkCancel() + if consecSDK >= 3 { + log.Printf("Scheduler: '%s' AUTO-DISABLING after %d consecutive SDK errors (workspace %s)", + sched.Name, consecSDK, short(sched.WorkspaceID, 12)) + autoDisableCtx, autoDisableCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + _, _ = db.DB.ExecContext(autoDisableCtx, ` + UPDATE workspace_schedules SET enabled = false, updated_at = now() WHERE id = $1 AND enabled = true`, + sched.ID) + autoDisableCancel() + } + } else { + // Non-SDK-error run — reset the counter. + // Guard: only reset when lastStatus is a clean ok (not 'stale', not + // 'error', not resultKind). An 'ok' resultKind means the SDK is fine + // and we should clear the streak. + if lastStatus == "ok" { + resetCtx, resetCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + _, _ = db.DB.ExecContext(resetCtx, ` + UPDATE workspace_schedules + SET consecutive_sdk_errors = 0, + updated_at = now() + WHERE id = $1`, sched.ID) + resetCancel() + } + } + nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()) var nextRunPtr *time.Time if nextErr == nil { @@ -763,6 +826,73 @@ func (s *Scheduler) sweepPhantomBusy(ctx context.Context) { } } +// detectResultKind inspects an A2A response body for SDK-layer error signals +// that are invisible at the HTTP level. The claude-code-sdk adapter returns +// HTTP 200 even when the inner LLM call throws (Max-plan rate-limit, quota +// exhaustion, SDK internal errors) — the error surfaces only in the response +// body under result.kind or result.result_kind. +// +// Returns an empty string when the response is clean (result_kind is "ok" or +// absent). Returns the result_kind value when it is a non-ok signal, so callers +// can propagate it as the schedule's last_status. +// +// Known non-ok kinds: +// - "rate_limited" — LLM API rate-limit hit (Max-plan, etc.) +// - "quota_exhausted" — quota / budget exhausted +// - "sdk_error" — SDK threw an internal error +// +// See #1696. +func detectResultKind(body []byte) string { + if len(body) == 0 { + return "" + } + var top map[string]json.RawMessage + if err := json.Unmarshal(body, &top); err != nil { + return "" + } + // Check result.kind first (canonical JSON-RPC shape). + if rawResult, ok := top["result"]; ok { + var result map[string]json.RawMessage + if err := json.Unmarshal(rawResult, &result); err == nil { + // result.kind (canonical JSON-RPC error envelope field). + if rawKind, ok := result["kind"]; ok { + var k string + if json.Unmarshal(rawKind, &k) == nil && k != "" && k != "ok" { + return k + } + } + // result.result_kind (legacy / alternative field name). + if rawKind, ok := result["result_kind"]; ok { + var k string + if json.Unmarshal(rawKind, &k) == nil && k != "" && k != "ok" { + return k + } + } + } + } + // Top-level error: non-ok HTTP 200 with a structured error in the body. + if rawErr, ok := top["error"]; ok { + var errMsg string + if err := json.Unmarshal(rawErr, &errMsg); err == nil && errMsg != "" { + // Distinguish SDK errors from other errors. SDK-layer errors from the + // Claude Code runtime include specific markers. + lower := strings.ToLower(errMsg) + // Check more specific patterns first (max-plan quota > general rate). + if strings.Contains(lower, "max-plan") || strings.Contains(lower, "quota") || strings.Contains(lower, "budget") { + return "quota_exhausted" + } + if strings.Contains(lower, "rate limit") || strings.Contains(lower, "rate_limit") { + return "rate_limited" + } + if strings.Contains(lower, "claude code returned an error") || strings.Contains(lower, "sdk error") || + strings.Contains(lower, "api key") || strings.Contains(lower, "authentication") { + return "sdk_error" + } + } + } + return "" +} + // isEmptyResponse checks if an A2A response body indicates the agent // produced no meaningful output. Catches "(no response generated)" from // the workspace runtime + genuinely empty/null responses. Used by the diff --git a/workspace-server/internal/scheduler/scheduler_test.go b/workspace-server/internal/scheduler/scheduler_test.go index 28272967c..9f16e9b30 100644 --- a/workspace-server/internal/scheduler/scheduler_test.go +++ b/workspace-server/internal/scheduler/scheduler_test.go @@ -3,6 +3,7 @@ package scheduler import ( "context" "database/sql" + "encoding/json" "testing" "time" "unicode/utf8" @@ -337,6 +338,12 @@ func TestFireSchedule_ComputeNextRunError(t *testing.T) { WithArgs(sched.ID). WillReturnResult(sqlmock.NewResult(0, 1)) + // #1696 consecutive_sdk_errors reset — successProxy has no result_kind, + // so detectResultKind returns "" and lastStatus="ok" → reset. + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID). + WillReturnResult(sqlmock.NewResult(0, 1)) + // UPDATE must fire — COALESCE($2, next_run_at) keeps existing value when $2 is nil. // AnyArg for $2 because it will be nil (ComputeNextRun failed). mock.ExpectExec(`UPDATE workspace_schedules`). @@ -592,7 +599,14 @@ func TestFireSchedule_NormalSuccess_AdvancesNextRunAt(t *testing.T) { WithArgs(sched.ID). WillReturnResult(sqlmock.NewResult(0, 1)) - // 3. Normal UPDATE after successful proxy call. + // 3. #1696 consecutive_sdk_errors reset — successProxy response has no + // result_kind in the body, so detectResultKind returns "" and lastStatus + // is "ok" → we hit the SDK-error counter reset branch. + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 4. Normal UPDATE after successful proxy call. // Args: $1=sched.ID, $2=nextRunPtr (computed time), $3=lastStatus, $4=lastError mock.ExpectExec(`UPDATE workspace_schedules`). WithArgs(sched.ID, sqlmock.AnyArg(), "ok", ""). @@ -655,6 +669,281 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) { } // trigger CI +// ── TestDetectResultKind ─────────────────────────────────────────────────────── + +// TestDetectResultKind covers the SDK error detection path: HTTP 200 responses +// with non-ok result_kind in the body must be recognised and returned as the +// kind string, not silently treated as ok. +func TestDetectResultKind(t *testing.T) { + // The test exercises detectResultKind directly so we don't need a full + // fireSchedule mock for this unit-test level. + tests := []struct { + name string + body string + wantKind string + }{ + { + name: "clean ok response — empty body", + body: `{}`, + wantKind: "", + }, + { + name: "clean ok response — result.kind absent", + body: `{"result":{"parts":[{"text":"hello"}]}}`, + wantKind: "", + }, + { + name: "clean ok response — result.kind=ok", + body: `{"result":{"kind":"ok","parts":[{"text":"hello"}]}}`, + wantKind: "", + }, + { + name: "clean ok response — result.result_kind=ok", + body: `{"result":{"result_kind":"ok","parts":[{"text":"hello"}]}}`, + wantKind: "", + }, + { + name: "SDK error — result.kind=rate_limited", + body: `{"result":{"kind":"rate_limited","parts":[{"text":"error"}]}}`, + wantKind: "rate_limited", + }, + { + name: "SDK error — result.kind=quota_exhausted", + body: `{"result":{"kind":"quota_exhausted"}}`, + wantKind: "quota_exhausted", + }, + { + name: "SDK error — result.kind=sdk_error", + body: `{"result":{"kind":"sdk_error"}}`, + wantKind: "sdk_error", + }, + { + name: "SDK error — result.result_kind=rate_limited", + body: `{"result":{"result_kind":"rate_limited"}}`, + wantKind: "rate_limited", + }, + { + name: "SDK error — error string with rate limit", + body: `{"result":{"parts":[]},"error":"An error occurred: rate limit exceeded"}`, + wantKind: "rate_limited", + }, + { + name: "SDK error — error string with max-plan", + body: `{"error":"Max-plan rate limit reached"}`, + wantKind: "quota_exhausted", + }, + { + name: "SDK error — error string with quota", + body: `{"error":"quota exhausted for model"}`, + wantKind: "quota_exhausted", + }, + { + name: "SDK error — error string with sdk error", + body: `{"error":"Claude Code returned an error result: success"}`, + wantKind: "sdk_error", + }, + { + name: "SDK error — error string with api key", + body: `{"error":"invalid API key"}`, + wantKind: "sdk_error", + }, + { + name: "unknown error string — not an SDK error", + body: `{"error":"something went wrong"}`, + wantKind: "", + }, + { + name: "empty response body", + body: ``, + wantKind: "", + }, + { + name: "malformed JSON", + body: `not valid json`, + wantKind: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := detectResultKind([]byte(tc.body)) + if got != tc.wantKind { + t.Errorf("detectResultKind(%q) = %q, want %q", tc.body, got, tc.wantKind) + } + }) + } +} + +// ── TestFireSchedule_SDKError_RateLimited (#1696) ─────────────────────────────── +// +// When ProxyA2ARequest returns HTTP 200 but the response body contains a +// non-ok result_kind, fireSchedule must: +// 1. Set last_status to the result_kind (not 'ok'). +// 2. Set last_error to describe the SDK error. +// 3. Increment consecutive_sdk_errors. +// 4. NOT auto-disable on first occurrence (threshold is 3). +// +// This test uses an sdkErrorProxy that returns a rate-limited body and asserts +// the first run is recorded as 'rate_limited' with consecutive_sdk_errors=1 +// and enabled=true. +func TestFireSchedule_SDKError_RateLimited(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "sdk1-test-sched-0001", + WorkspaceID: "sdk1-test-workspace1", + Name: "rate-limited-job", + CronExpr: "0 * * * *", + Timezone: "UTC", + Prompt: "do work", + } + + // 1. active_tasks check → workspace idle + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0)) + + // 2. #1696 consecutive_sdk_errors bump — RETURNING gives us count=1. + // Use ExpectQuery (not Exec) because QueryRowContext + RETURNING + // produces a result set consumed via .Scan(). + mock.ExpectQuery(`UPDATE workspace_schedules`). + WithArgs(sched.ID). + WillReturnRows(sqlmock.NewRows([]string{"consecutive_sdk_errors"}).AddRow(1)) + + // 3. Post-fire UPDATE — last_status='rate_limited', last_error='SDK error: result_kind=rate_limited' + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 4. activity_logs INSERT + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(&sdkErrorProxy{kind: "rate_limited"}, nil) + s.fireSchedule(context.Background(), sched) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations for SDK-error first run: %v", err) + } +} + +// ── TestFireSchedule_SDKError_AutoDisableOnThirdConsecutive (#1696) ─────────── +// +// On the 3rd consecutive SDK error, fireSchedule must auto-disable the +// schedule (enabled=false) in addition to recording the error status. +// Threshold is 3 per #1696 requirement. +func TestFireSchedule_SDKError_AutoDisableOnThirdConsecutive(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "sdk2-test-sched-0002", + WorkspaceID: "sdk2-test-workspace2", + Name: "auto-disable-job", + CronExpr: "0 * * * *", + Timezone: "UTC", + Prompt: "do work", + } + + // 1. active_tasks check → workspace idle + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0)) + + // 2. #1696 consecutive_sdk_errors bump — RETURNING gives count=3 (threshold met). + // Use ExpectQuery (not Exec) because QueryRowContext + RETURNING + // produces a result set consumed via .Scan(). + mock.ExpectQuery(`UPDATE workspace_schedules`). + WithArgs(sched.ID). + WillReturnRows(sqlmock.NewRows([]string{"consecutive_sdk_errors"}).AddRow(3)) + + // 3. Auto-disable UPDATE — sets enabled=false (schedule has hit 3rd SDK error) + mock.ExpectExec(`UPDATE workspace_schedules SET enabled`). + WithArgs(sched.ID). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 4. Post-fire UPDATE + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 5. activity_logs INSERT + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "rate_limited", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(&sdkErrorProxy{kind: "rate_limited"}, nil) + s.fireSchedule(context.Background(), sched) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations for SDK-error auto-disable: %v", err) + } +} + +// ── TestFireSchedule_SDKError_CounterResetOnCleanRun (#1696) ────────────────── +// +// A clean HTTP-200 run (no SDK error) must reset consecutive_sdk_errors to 0. +// This prevents false auto-disable after intermittent SDK blips. +func TestFireSchedule_SDKError_CounterResetOnCleanRun(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "sdk3-test-sched-0003", + WorkspaceID: "sdk3-test-workspace3", + Name: "clean-reset-job", + CronExpr: "30 * * * *", + Timezone: "UTC", + Prompt: "do work", + } + + // 1. active_tasks check → workspace idle + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0)) + + // 2. No SDK error — #1696 counter is reset to 0 + // (lastStatus is 'ok', resultKind is empty, so we go to the reset branch) + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 3. Post-fire UPDATE + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), "ok", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 4. activity_logs INSERT + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(&successProxy{}, nil) + s.fireSchedule(context.Background(), sched) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations for SDK-error counter reset: %v", err) + } +} + +// ── sdkErrorProxy ────────────────────────────────────────────────────────────── +// +// sdkErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200 but +// embeds a non-ok result_kind in the response body, simulating a Claude Code +// SDK that returned 200 but the inner LLM call threw a rate-limit / quota error. +// Used by TestFireSchedule_SDKError_* to cover #1696 SDK error detection. +type sdkErrorProxy struct { + kind string // result_kind value to embed in the response body +} + +func (p *sdkErrorProxy) ProxyA2ARequest( + _ context.Context, _ string, _ []byte, _ string, _ bool, +) (int, []byte, error) { + body, _ := json.Marshal(map[string]interface{}{ + "result": map[string]interface{}{ + "kind": p.kind, + "parts": []map[string]interface{}{{"kind": "text", "text": "(no response generated)"}}, + }, + }) + return 200, body, nil +} + // ── TestTruncate_utf8Safe_regression2026 ────────────────────────────────────── // TestTruncate_utf8Safe_regression2026 locks in the #2026 fix: truncate must diff --git a/workspace-server/migrations/20260523000000_schedule_consecutive_sdk_errors.down.sql b/workspace-server/migrations/20260523000000_schedule_consecutive_sdk_errors.down.sql new file mode 100644 index 000000000..cb0923cf9 --- /dev/null +++ b/workspace-server/migrations/20260523000000_schedule_consecutive_sdk_errors.down.sql @@ -0,0 +1,4 @@ +-- migration: 20260523000000_schedule_consecutive_sdk_errors.down.sql +-- Reverts #1696 fix for #1696 (consecutive_sdk_errors column) + +ALTER TABLE workspace_schedules DROP COLUMN IF EXISTS consecutive_sdk_errors; \ No newline at end of file diff --git a/workspace-server/migrations/20260523000000_schedule_consecutive_sdk_errors.up.sql b/workspace-server/migrations/20260523000000_schedule_consecutive_sdk_errors.up.sql new file mode 100644 index 000000000..62fac77ae --- /dev/null +++ b/workspace-server/migrations/20260523000000_schedule_consecutive_sdk_errors.up.sql @@ -0,0 +1,20 @@ +-- migration: 20260523000000_schedule_consecutive_sdk_errors.up.sql +-- Fixes #1696: Add consecutive_sdk_errors counter to track SDK errors (HTTP 200 +-- responses where the Claude Code runtime returned a non-ok result_kind). +-- When this counter reaches 3, the scheduler sets last_status='rate_limited' +-- and auto-disables the schedule. +-- +-- The core issue: the claude-code-sdk adapter returns HTTP 200 even when the +-- inner LLM call throws (e.g. Max-plan rate-limit). All 3 observed runs logged +-- "completed (HTTP 200)" yet surfaced agent errors in the workspace chat. +-- This counter lets us detect that pattern and escalate appropriately. + +ALTER TABLE workspace_schedules + ADD COLUMN IF NOT EXISTS consecutive_sdk_errors INTEGER NOT NULL DEFAULT 0; + +COMMENT ON COLUMN workspace_schedules.consecutive_sdk_errors IS + 'Count of consecutive scheduler fires where ProxyA2ARequest returned HTTP 200 + but the response body contained a non-ok result_kind (e.g. rate_limited, + sdk_error, quota_exhausted). Reset to 0 on any non-SDK-error status. + After 3 consecutive SDK errors the schedule is auto-disabled with + status rate_limited. Fixes #1696.'; \ No newline at end of file