From efc48a26f8f77e6cbde2564840237c0782e729b1 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sat, 23 May 2026 01:27:00 +0000 Subject: [PATCH] =?UTF-8?q?fix(scheduler):=20#1696=20=E2=80=94=20detect=20?= =?UTF-8?q?A2A=20adapter=20errors=20in=202xx=20response=20body?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scheduler recorded last_status='ok' when the adapter SDK threw internally but still returned HTTP 200, because fireSchedule only inspected proxyErr and statusCode. A2A JSON-RPC errors (and plain string error payloads) in the body were treated as success. Changes: - Add a2aErrorFromBody() helper that checks respBody for JSON-RPC {"error":{"message":"..."}} and plain {"error":"..."} shapes. - In fireSchedule, when proxyErr is nil and status is 2xx, call a2aErrorFromBody; if an error message is found, set last_status to "error" and last_error to "A2A adapter error: ". - Add adapterErrorProxy test double + TestFireSchedule_AdapterSDKError regression test. Co-Authored-By: Claude Opus 4.7 --- .../internal/scheduler/scheduler.go | 30 +++++++++++ .../internal/scheduler/scheduler_test.go | 52 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 53f17e0cb..10e67208e 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -433,6 +433,10 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { lastStatus = "error" lastError = fmt.Sprintf("HTTP %d", statusCode) log.Printf("Scheduler: '%s' non-2xx: %d", sched.Name, statusCode) + } else if a2aErr := a2aErrorFromBody(respBody); a2aErr != "" { + lastStatus = "error" + lastError = fmt.Sprintf("A2A adapter error: %s", a2aErr) + log.Printf("Scheduler: '%s' A2A adapter error (HTTP %d): %s", sched.Name, statusCode, a2aErr) } else { log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode) } @@ -808,6 +812,32 @@ func isEmptyResponse(body []byte) bool { return false } +// a2aErrorFromBody extracts an A2A/JSON-RPC error message from a 2xx +// response body. The adapter SDK may return HTTP 200 with an error +// payload when it throws internally; this prevents the scheduler from +// falsely recording last_status='ok'. +// Issue #1696. +func a2aErrorFromBody(body []byte) string { + if len(body) == 0 { + return "" + } + var resp map[string]interface{} + if json.Unmarshal(body, &resp) != nil { + return "" + } + // JSON-RPC style: {"error":{"code":-32603,"message":"..."}} + if errObj, ok := resp["error"].(map[string]interface{}); ok { + if msg, ok := errObj["message"].(string); ok { + return msg + } + } + // Plain style: {"error":"..."} + if errStr, ok := resp["error"].(string); ok { + return errStr + } + return "" +} + // truncation moved to internal/textutil.TruncateBytes (#2962 SSOT). // The original #2026 fix lives in textutil's package docs as canonical // prior art. Ellipsis was previously "..." (3 ASCII bytes); the SSOT diff --git a/workspace-server/internal/scheduler/scheduler_test.go b/workspace-server/internal/scheduler/scheduler_test.go index aaa433698..28272967c 100644 --- a/workspace-server/internal/scheduler/scheduler_test.go +++ b/workspace-server/internal/scheduler/scheduler_test.go @@ -256,6 +256,58 @@ func (p *successProxy) ProxyA2ARequest( return 200, []byte(`{"ok":true}`), nil } +// ── adapterErrorProxy ───────────────────────────────────────────────────────── + +// adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200 +// with a JSON-RPC error body, simulating an adapter SDK that throws internally +// but still completes the HTTP round-trip. Issue #1696. +type adapterErrorProxy struct{} + +func (p *adapterErrorProxy) ProxyA2ARequest( + _ context.Context, _ string, _ []byte, _ string, _ bool, +) (int, []byte, error) { + return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil +} + +// ── TestFireSchedule_AdapterSDKError (#1696) ────────────────────────────────── +// +// When the adapter SDK throws internally and returns HTTP 200 with an error +// payload, fireSchedule must record last_status='error', not 'ok'. + +func TestFireSchedule_AdapterSDKError(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "55555555-dead-beef-0000-000000000005", + WorkspaceID: "66666666-dead-beef-0000-000000000006", + Name: "adapter-err-job", + CronExpr: "0 * * * *", + Timezone: "UTC", + Prompt: "do something", + } + + // active_tasks check → 0 (workspace is idle; proceed to fire) + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0)) + + // Post-fire UPDATE must record last_status='error' with the adapter error message. + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), "error", "A2A adapter error: adapter SDK internal error"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // activity_logs INSERT must carry status='error' and the error detail. + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "error", "A2A adapter error: adapter SDK internal error"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(&adapterErrorProxy{}, nil) + s.fireSchedule(context.Background(), sched) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations — adapter error not recorded correctly: %v", err) + } +} + // ── TestFireSchedule_ComputeNextRunError (#722 Bug 1) ───────────────────────── // // When ComputeNextRun fails (bad cron expression), fireSchedule must NOT write -- 2.52.0