fix(scheduler): #1696 — detect A2A adapter errors in 2xx response body #1698
@@ -433,6 +433,10 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
lastStatus = "error"
|
||||
lastError = fmt.Sprintf("HTTP %d", statusCode)
|
||||
log.Printf("Scheduler: '%s' non-2xx: %d", sched.Name, statusCode)
|
||||
} else if a2aErr := a2aErrorFromBody(respBody); a2aErr != "" {
|
||||
lastStatus = "error"
|
||||
lastError = fmt.Sprintf("A2A adapter error: %s", a2aErr)
|
||||
log.Printf("Scheduler: '%s' A2A adapter error (HTTP %d): %s", sched.Name, statusCode, a2aErr)
|
||||
} else {
|
||||
log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode)
|
||||
}
|
||||
@@ -808,6 +812,32 @@ func isEmptyResponse(body []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// a2aErrorFromBody extracts an A2A/JSON-RPC error message from a 2xx
|
||||
// response body. The adapter SDK may return HTTP 200 with an error
|
||||
// payload when it throws internally; this prevents the scheduler from
|
||||
// falsely recording last_status='ok'.
|
||||
// Issue #1696.
|
||||
func a2aErrorFromBody(body []byte) string {
|
||||
if len(body) == 0 {
|
||||
return ""
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if json.Unmarshal(body, &resp) != nil {
|
||||
return ""
|
||||
}
|
||||
// JSON-RPC style: {"error":{"code":-32603,"message":"..."}}
|
||||
if errObj, ok := resp["error"].(map[string]interface{}); ok {
|
||||
if msg, ok := errObj["message"].(string); ok {
|
||||
return msg
|
||||
}
|
||||
}
|
||||
// Plain style: {"error":"..."}
|
||||
if errStr, ok := resp["error"].(string); ok {
|
||||
return errStr
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// truncation moved to internal/textutil.TruncateBytes (#2962 SSOT).
|
||||
// The original #2026 fix lives in textutil's package docs as canonical
|
||||
// prior art. Ellipsis was previously "..." (3 ASCII bytes); the SSOT
|
||||
|
||||
@@ -256,6 +256,58 @@ func (p *successProxy) ProxyA2ARequest(
|
||||
return 200, []byte(`{"ok":true}`), nil
|
||||
}
|
||||
|
||||
// ── adapterErrorProxy ─────────────────────────────────────────────────────────
|
||||
|
||||
// adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200
|
||||
// with a JSON-RPC error body, simulating an adapter SDK that throws internally
|
||||
// but still completes the HTTP round-trip. Issue #1696.
|
||||
type adapterErrorProxy struct{}
|
||||
|
||||
func (p *adapterErrorProxy) ProxyA2ARequest(
|
||||
_ context.Context, _ string, _ []byte, _ string, _ bool,
|
||||
) (int, []byte, error) {
|
||||
return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil
|
||||
}
|
||||
|
||||
// ── TestFireSchedule_AdapterSDKError (#1696) ──────────────────────────────────
|
||||
//
|
||||
// When the adapter SDK throws internally and returns HTTP 200 with an error
|
||||
// payload, fireSchedule must record last_status='error', not 'ok'.
|
||||
|
||||
func TestFireSchedule_AdapterSDKError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
sched := scheduleRow{
|
||||
ID: "55555555-dead-beef-0000-000000000005",
|
||||
WorkspaceID: "66666666-dead-beef-0000-000000000006",
|
||||
Name: "adapter-err-job",
|
||||
CronExpr: "0 * * * *",
|
||||
Timezone: "UTC",
|
||||
Prompt: "do something",
|
||||
}
|
||||
|
||||
// active_tasks check → 0 (workspace is idle; proceed to fire)
|
||||
mock.ExpectQuery(`SELECT COALESCE`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
|
||||
|
||||
// Post-fire UPDATE must record last_status='error' with the adapter error message.
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs(sched.ID, sqlmock.AnyArg(), "error", "A2A adapter error: adapter SDK internal error").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// activity_logs INSERT must carry status='error' and the error detail.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "error", "A2A adapter error: adapter SDK internal error").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(&adapterErrorProxy{}, nil)
|
||||
s.fireSchedule(context.Background(), sched)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations — adapter error not recorded correctly: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestFireSchedule_ComputeNextRunError (#722 Bug 1) ─────────────────────────
|
||||
//
|
||||
// When ComputeNextRun fails (bad cron expression), fireSchedule must NOT write
|
||||
|
||||
Reference in New Issue
Block a user