Compare commits

...

1 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) efc48a26f8 fix(scheduler): #1696 — detect A2A adapter errors in 2xx response body
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 8s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
E2E Chat / detect-changes (pull_request) Successful in 12s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 12s
Harness Replays / detect-changes (pull_request) Successful in 8s
E2E API Smoke Test / detect-changes (pull_request) Successful in 13s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 10s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
sop-checklist / na-declarations (pull_request) N/A: (none)
qa-review / approved (pull_request) Failing after 10s
sop-checklist / review-refire (pull_request) Has been skipped
gate-check-v3 / gate-check (pull_request) Successful in 13s
sop-checklist / all-items-acked (pull_request) Successful in 9s
security-review / approved (pull_request) Failing after 10s
sop-tier-check / tier-check (pull_request) Successful in 5s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 3s
CI / Canvas (Next.js) (pull_request) Successful in 4s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 9s
Harness Replays / Harness Replays (pull_request) Successful in 12s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m9s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m23s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Successful in 5m1s
CI / all-required (pull_request) Successful in 5m50s
audit-force-merge / audit (pull_request) Successful in 14s
The scheduler recorded last_status='ok' when the adapter SDK threw
internally but still returned HTTP 200, because fireSchedule only
inspected proxyErr and statusCode.  A2A JSON-RPC errors (and plain
string error payloads) in the body were treated as success.

Changes:
- Add a2aErrorFromBody() helper that checks respBody for JSON-RPC
  {"error":{"message":"..."}} and plain {"error":"..."} shapes.
- In fireSchedule, when proxyErr is nil and status is 2xx, call
  a2aErrorFromBody; if an error message is found, set last_status to
  "error" and last_error to "A2A adapter error: <msg>".
- Add adapterErrorProxy test double + TestFireSchedule_AdapterSDKError
  regression test.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 01:27:00 +00:00
2 changed files with 82 additions and 0 deletions
@@ -433,6 +433,10 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
lastStatus = "error"
lastError = fmt.Sprintf("HTTP %d", statusCode)
log.Printf("Scheduler: '%s' non-2xx: %d", sched.Name, statusCode)
} else if a2aErr := a2aErrorFromBody(respBody); a2aErr != "" {
lastStatus = "error"
lastError = fmt.Sprintf("A2A adapter error: %s", a2aErr)
log.Printf("Scheduler: '%s' A2A adapter error (HTTP %d): %s", sched.Name, statusCode, a2aErr)
} else {
log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode)
}
@@ -808,6 +812,32 @@ func isEmptyResponse(body []byte) bool {
return false
}
// a2aErrorFromBody extracts an A2A/JSON-RPC error message from a 2xx
// response body. The adapter SDK may return HTTP 200 with an error
// payload when it throws internally; this prevents the scheduler from
// falsely recording last_status='ok'.
// Issue #1696.
func a2aErrorFromBody(body []byte) string {
if len(body) == 0 {
return ""
}
var resp map[string]interface{}
if json.Unmarshal(body, &resp) != nil {
return ""
}
// JSON-RPC style: {"error":{"code":-32603,"message":"..."}}
if errObj, ok := resp["error"].(map[string]interface{}); ok {
if msg, ok := errObj["message"].(string); ok {
return msg
}
}
// Plain style: {"error":"..."}
if errStr, ok := resp["error"].(string); ok {
return errStr
}
return ""
}
// truncation moved to internal/textutil.TruncateBytes (#2962 SSOT).
// The original #2026 fix lives in textutil's package docs as canonical
// prior art. Ellipsis was previously "..." (3 ASCII bytes); the SSOT
@@ -256,6 +256,58 @@ func (p *successProxy) ProxyA2ARequest(
return 200, []byte(`{"ok":true}`), nil
}
// ── adapterErrorProxy ─────────────────────────────────────────────────────────
// adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200
// with a JSON-RPC error body, simulating an adapter SDK that throws internally
// but still completes the HTTP round-trip. Issue #1696.
type adapterErrorProxy struct{}
func (p *adapterErrorProxy) ProxyA2ARequest(
_ context.Context, _ string, _ []byte, _ string, _ bool,
) (int, []byte, error) {
return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil
}
// ── TestFireSchedule_AdapterSDKError (#1696) ──────────────────────────────────
//
// When the adapter SDK throws internally and returns HTTP 200 with an error
// payload, fireSchedule must record last_status='error', not 'ok'.
func TestFireSchedule_AdapterSDKError(t *testing.T) {
mock := setupTestDB(t)
sched := scheduleRow{
ID: "55555555-dead-beef-0000-000000000005",
WorkspaceID: "66666666-dead-beef-0000-000000000006",
Name: "adapter-err-job",
CronExpr: "0 * * * *",
Timezone: "UTC",
Prompt: "do something",
}
// active_tasks check → 0 (workspace is idle; proceed to fire)
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// Post-fire UPDATE must record last_status='error' with the adapter error message.
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), "error", "A2A adapter error: adapter SDK internal error").
WillReturnResult(sqlmock.NewResult(0, 1))
// activity_logs INSERT must carry status='error' and the error detail.
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "error", "A2A adapter error: adapter SDK internal error").
WillReturnResult(sqlmock.NewResult(0, 1))
s := New(&adapterErrorProxy{}, nil)
s.fireSchedule(context.Background(), sched)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations — adapter error not recorded correctly: %v", err)
}
}
// ── TestFireSchedule_ComputeNextRunError (#722 Bug 1) ─────────────────────────
//
// When ComputeNextRun fails (bad cron expression), fireSchedule must NOT write