fix(scheduler): enqueue cron ticks on busy agents instead of dropping them #2446

Merged
agent-reviewer merged 2 commits from fix/scheduler-enqueue-cron-on-busy into main 2026-06-08 22:50:47 +00:00
6 changed files with 469 additions and 36 deletions
@@ -225,6 +225,16 @@ func (e *proxyA2AError) Error() string {
return "proxy a2a error"
}
// EnqueueA2A is a method wrapper around the package-level EnqueueA2A function so
// that *WorkspaceHandler satisfies the scheduler's A2AProxy interface. The
// scheduler cannot call the package function directly (it would have to import
// internal/handlers, but handlers already imports internal/scheduler → import
// cycle), so it goes through this method on the proxy it already holds. Used by
// the cron scheduler to durably buffer a tick when the target workspace is busy.
func (h *WorkspaceHandler) EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) {
return EnqueueA2A(ctx, workspaceID, callerID, priority, body, method, idempotencyKey, expiresAt)
}
// ProxyA2ARequest is the public wrapper for proxyA2ARequest, used by the
// cron scheduler and other internal callers that need to send A2A messages
// to workspaces programmatically (not from an HTTP handler).
@@ -97,10 +97,10 @@ type QueuedItem struct {
// returns the new row ID + current queue depth. Caller MUST have already
// determined the target is busy — this function does not check.
//
// Idempotency: when idempotencyKey is non-empty, the partial unique index
// `idx_a2a_queue_idempotency` prevents duplicate active rows for the same
// (workspace_id, idempotency_key). On conflict this returns the existing
// row's ID so the caller's log still points at the live queue entry.
// Idempotency: when idempotencyKey is non-empty, a duplicate active enqueue
// for the same (workspace, key) is collapsed rather than double-buffered. On
// a duplicate this returns the existing row's ID so the caller's log still
// points at the live queue entry.
func EnqueueA2A(
ctx context.Context,
workspaceID, callerID string,
@@ -129,6 +129,32 @@ func EnqueueA2A(
expiresAtArg = *expiresAt
}
// Supersede any already-expired pending row for this same key before we
// insert. The drain path skips expired pending rows, so such a row never
// completes on its own — it lingers in the active set and would block the
// conflict check below, silently swallowing this fresh enqueue. Retiring
// it here (a) frees the active set so the insert below proceeds and (b)
// cleans the stale row up so expired rows don't accumulate. Scoped to the
// idempotency key so unrelated traffic is untouched.
if idempotencyKey != "" {
if _, supErr := db.DB.ExecContext(ctx, `
UPDATE a2a_queue
SET status = 'dropped',
last_error = 'superseded: expired before drain; replaced by a fresh enqueue'
WHERE workspace_id = $1
AND idempotency_key = $2
AND status = 'queued'
AND expires_at IS NOT NULL
AND expires_at <= now()
`, workspaceID, idempotencyKey); supErr != nil {
// Non-fatal: if the cleanup fails we still attempt the insert. Worst
// case the conflict path returns the (stale) existing row's id, which
// is the pre-fix behaviour — no new breakage introduced here.
log.Printf("A2AQueue: supersede-expired cleanup failed for workspace %s key %s: %v",
workspaceID, idempotencyKey, supErr)
}
}
// INSERT ... ON CONFLICT DO NOTHING RETURNING id. The conflict target
// must reference the partial unique INDEX columns + WHERE clause directly
// (Postgres can't reference partial unique indexes by name in
@@ -0,0 +1,160 @@
package handlers
// a2a_queue_enqueue_expired_test.go — regression for CR3 RC 9853.
//
// Bug: a pending buffered tick that expires before the drain reaches it is
// skipped by the drain (it filters out expired pending rows) yet still occupies
// the active set the idempotency check guards. A later tick for the SAME key
// would then collapse onto that dead row and be silently swallowed — the exact
// drop the busy-buffer path was built to prevent.
//
// Fix: EnqueueA2A retires any already-expired pending row for the key BEFORE the
// insert, so the fresh tick buffers (and the stale row is cleaned up) instead of
// being dropped.
//
// These tests use the QueryMatcherEqual mock (setupTestDBForQueueTests) so the
// SQL strings below must match the handler's queries verbatim.
import (
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
const (
enqWorkspaceID = "ws-enq-expired"
enqKey = "sched-aaaa-bbbb" // schedule_id used as idempotency key
enqBody = `{"method":"message/send"}`
enqMethod = "message/send"
)
// expectSupersedeExpired registers the cleanup UPDATE EnqueueA2A issues before
// the insert when an idempotency key is present. rowsRetired is how many expired
// pending rows the UPDATE claims to have dropped.
func expectSupersedeExpired(mock sqlmock.Sqlmock, workspaceID, key string, rowsRetired int64) {
mock.ExpectExec(`
UPDATE a2a_queue
SET status = 'dropped',
last_error = 'superseded: expired before drain; replaced by a fresh enqueue'
WHERE workspace_id = $1
AND idempotency_key = $2
AND status = 'queued'
AND expires_at IS NOT NULL
AND expires_at <= now()
`).
WithArgs(workspaceID, key).
WillReturnResult(sqlmock.NewResult(0, rowsRetired))
}
// expectInsert registers the INSERT ... ON CONFLICT DO NOTHING RETURNING id.
// newID is the id the insert returns (non-conflict / fresh enqueue path).
func expectInsert(mock sqlmock.Sqlmock, newID string) {
mock.ExpectQuery(`
INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key, expires_at)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
ON CONFLICT (workspace_id, idempotency_key)
WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched')
DO NOTHING
RETURNING id
`).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(newID))
}
// expectDepth registers the trailing queue-depth count query.
func expectDepth(mock sqlmock.Sqlmock, workspaceID string, depth int) {
mock.ExpectQuery(`
SELECT COUNT(*) FROM a2a_queue
WHERE workspace_id = $1 AND status = 'queued'
`).WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(depth))
}
// TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick is the core CR3 regression:
// an existing expired pending row for a schedule's key must NOT cause the next
// tick's enqueue to be dropped. The expired row is retired first, then the
// fresh tick inserts and returns a NEW id.
func TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick(t *testing.T) {
mock := setupTestDBForQueueTests(t)
// One expired pending row exists for this key and gets retired.
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 1)
// With the active set cleared, the insert proceeds (no conflict) → new id.
const freshID = "fresh-tick-id"
expectInsert(mock, freshID)
expectDepth(mock, enqWorkspaceID, 1)
nextRun := time.Now().Add(30 * time.Second)
id, depth, err := EnqueueA2A(
context.Background(), enqWorkspaceID, "", PriorityTask,
[]byte(enqBody), enqMethod, enqKey, &nextRun,
)
if err != nil {
t.Fatalf("EnqueueA2A returned error: %v", err)
}
if id != freshID {
t.Errorf("expected the fresh tick to enqueue with a new id %q, got %q "+
"(an expired row must not swallow the new tick)", freshID, id)
}
if depth != 1 {
t.Errorf("expected depth 1, got %d", depth)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestEnqueueA2A_NoExpiredRow_NormalEnqueue: when no expired row exists the
// supersede UPDATE simply affects zero rows and the enqueue proceeds normally.
func TestEnqueueA2A_NoExpiredRow_NormalEnqueue(t *testing.T) {
mock := setupTestDBForQueueTests(t)
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0) // nothing to retire
const newID = "new-id"
expectInsert(mock, newID)
expectDepth(mock, enqWorkspaceID, 2)
nextRun := time.Now().Add(30 * time.Second)
id, depth, err := EnqueueA2A(
context.Background(), enqWorkspaceID, "", PriorityTask,
[]byte(enqBody), enqMethod, enqKey, &nextRun,
)
if err != nil {
t.Fatalf("EnqueueA2A returned error: %v", err)
}
if id != newID {
t.Errorf("expected id %q, got %q", newID, id)
}
if depth != 2 {
t.Errorf("expected depth 2, got %d", depth)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestEnqueueA2A_NoKey_SkipsSupersede: with no idempotency key there is no
// active-set conflict to guard, so the supersede cleanup is skipped entirely
// and only the insert + depth queries run.
func TestEnqueueA2A_NoKey_SkipsSupersede(t *testing.T) {
mock := setupTestDBForQueueTests(t)
// No expectSupersedeExpired — it must NOT be issued when key is empty.
const newID = "no-key-id"
expectInsert(mock, newID)
expectDepth(mock, enqWorkspaceID, 1)
id, _, err := EnqueueA2A(
context.Background(), enqWorkspaceID, "", PriorityTask,
[]byte(enqBody), enqMethod, "", nil,
)
if err != nil {
t.Fatalf("EnqueueA2A returned error: %v", err)
}
if id != newID {
t.Errorf("expected id %q, got %q", newID, id)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
+125 -28
View File
@@ -34,6 +34,11 @@ const (
// fireSchedule goroutine indefinitely, which blocked wg.Wait() in
// tick(), which stalled the entire scheduler until operator restart.
dbQueryTimeout = 10 * time.Second
// priorityTask mirrors handlers.PriorityTask (50) — the default FIFO A2A
// queue priority. Duplicated as a local const because the scheduler cannot
// import internal/handlers (handlers imports scheduler → cycle). Buffered
// cron ticks enqueue at the same priority as normal busy-retry A2A work.
priorityTask = 50
)
// sanitizeUTF8 replaces invalid UTF-8 byte sequences with the Unicode
@@ -48,9 +53,14 @@ func sanitizeUTF8(s string) string {
}
// A2AProxy is the interface the scheduler needs to send messages to workspaces.
// WorkspaceHandler.ProxyA2ARequest satisfies this.
// WorkspaceHandler.ProxyA2ARequest + WorkspaceHandler.EnqueueA2A satisfy this.
type A2AProxy interface {
ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error)
// EnqueueA2A durably buffers an A2A message for a busy workspace; the
// drain dispatches it serially when the agent frees. idempotencyKey
// collapses duplicate pending buffers per (workspace,key). Returns the
// buffered entry id, the resulting pending depth, and any error.
EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error)
}
// Broadcaster records events and pushes them to WebSocket clients.
@@ -367,33 +377,6 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
sched.WorkspaceID,
).Scan(&activeTasks, &maxConcurrent)
capCancel()
if capErr == nil && activeTasks >= maxConcurrent {
log.Printf("Scheduler: '%s' workspace %s at capacity (active_tasks=%d, max=%d), deferring up to 2 min",
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent)
// Poll every 10s for up to 2 minutes
waited := false
for i := 0; i < 12; i++ {
time.Sleep(10 * time.Second)
pollCtx, pollCancel := context.WithTimeout(ctx, dbQueryTimeout)
err := db.DB.QueryRowContext(pollCtx,
`SELECT COALESCE(active_tasks, 0), COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks, &maxConcurrent)
pollCancel()
if err != nil || activeTasks < maxConcurrent {
waited = true
break
}
}
if !waited && activeTasks >= maxConcurrent {
log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d, max=%d)",
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent)
s.recordSkipped(ctx, sched, activeTasks)
return
}
log.Printf("Scheduler: '%s' workspace %s has capacity after deferral, firing",
sched.Name, short(sched.WorkspaceID, 12))
}
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
defer cancel()
@@ -402,6 +385,9 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
// The agent sees recent peer messages before acting, enabling cross-agent
// awareness without explicit A2A delegation. Best-effort — if the fetch
// fails or the workspace has no Slack channels, the prompt is unchanged.
//
// Built BEFORE the capacity check so the busy-enqueue path below buffers
// the exact same A2A message the fire path would have dispatched.
prompt := sched.Prompt
if s.channels != nil {
if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" {
@@ -426,6 +412,49 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
return
}
// #969 → durable buffering. When the target workspace is busy
// (active_tasks >= max_concurrent_tasks) we do NOT skip the tick and we do
// NOT block the scheduler goroutine waiting for capacity. Instead we durably
// buffer the cron message, mirroring how busy A2A dispatches already buffer.
// The drain then dispatches it serially the moment the agent frees —
// execution stays one-at-a-time; max_concurrent_tasks is unchanged.
//
// This supersedes the previous "poll then recordSkipped" behavior, which
// dropped scheduled ticks on workspaces that stayed busy across the whole
// poll window.
//
// Idempotency key = sched.ID (the SCHEDULE id), NOT msgID/a random uuid.
// Keying by schedule_id means a busy agent buffers AT MOST ONE pending tick
// per schedule — the latest one wins, the obsolete newer tick is collapsed —
// so we hold the next tick instead of stacking a stale backlog.
if capErr == nil && activeTasks >= maxConcurrent {
// Buffered ticks expire at the next scheduled fire: a tick that's been
// sitting in the queue past when the cron would naturally tick again is
// stale, so let it expire rather than fire late. Best-effort — on a bad
// cron expr we enqueue with no TTL (NULL) rather than block the tick.
var expiresAt *time.Time
if nextRun, nrErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); nrErr == nil {
expiresAt = &nextRun
}
enqCtx, enqCancel := context.WithTimeout(ctx, dbQueryTimeout)
// Empty callerID = canvas-style (source_id NULL), matching the fire path.
qID, depth, enqErr := s.proxy.EnqueueA2A(enqCtx, sched.WorkspaceID, "", priorityTask, a2aBody, "message/send", sched.ID, expiresAt)
enqCancel()
if enqErr != nil {
// Enqueue failed — fall back to recording a skip so the liveness
// view still advances and the operator sees the error, rather than
// silently dropping the tick or firing into a busy agent.
log.Printf("Scheduler: '%s' enqueue on busy workspace %s failed, recording skip: %v",
sched.Name, short(sched.WorkspaceID, 12), enqErr)
s.recordSkipped(ctx, sched, activeTasks)
return
}
log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d, max=%d) — enqueued tick %s (queue depth=%d), will drain when idle",
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent, short(qID, 8), depth)
s.recordQueued(ctx, sched, activeTasks, qID, depth)
return
}
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
// Empty callerID = canvas-style request (bypasses access control, source_id=NULL in activity log).
@@ -727,6 +756,74 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
}
}
// recordQueued advances next_run_at and logs a cron_run activity entry with
// status='queued' when the target workspace was busy and the tick was durably
// buffered instead of fired. Mirrors recordSkipped (#115) but records a buffer,
// not a drop: the drain will dispatch qID serially when the agent frees.
// next_run_at still advances so the liveness view keeps ticking and the NEXT
// cron slot enqueues (the schedule_id idempotency key then holds at most one
// pending tick — the latest — per schedule).
func (s *Scheduler) recordQueued(ctx context.Context, sched scheduleRow, activeTasks int, queueID string, depth int) {
reason := fmt.Sprintf("queued: workspace busy (active_tasks=%d), buffered (id=%s, depth=%d)", activeTasks, short(queueID, 8), depth)
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
var nextRunPtr *time.Time
if nextErr == nil {
nextRunPtr = &nextRun
} else {
// Same guard as recordSkipped/fireSchedule — preserve existing
// next_run_at rather than writing NULL on an unparseable cron expr.
log.Printf("Scheduler: ComputeNextRun error in recordQueued for '%s' (%s) — preserving existing next_run_at: %v",
sched.Name, sched.ID, nextErr)
}
queuedUpdCtx, queuedUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if _, err := db.DB.ExecContext(queuedUpdCtx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = COALESCE($2, next_run_at),
run_count = run_count + 1,
last_status = 'queued',
last_error = $3,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: '%s' queued update failed: %v", sched.Name, err)
}
queuedUpdCancel()
cronMeta, marshalErr := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"queued": true,
"active_tasks": activeTasks,
"queue_id": queueID,
"queue_depth": depth,
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal cronMeta(queued) failed: %v", sched.Name, marshalErr)
} else {
queuedInsCtx, queuedInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if _, err := db.DB.ExecContext(queuedInsCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'queued', $4, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron queued (busy): "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: '%s' queued activity log failed: %v", sched.Name, err)
}
queuedInsCancel()
}
if s.broadcaster != nil {
_ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"reason": reason,
"queued": true,
})
}
}
// repairNullNextRunAt is called once during Start() to recompute next_run_at
// for any enabled schedule where it is NULL — a state left by the pre-#722 bug
// where a ComputeNextRun error caused an UPDATE that wrote NULL.
@@ -73,6 +73,14 @@ type recordingProxy struct {
lastCaller string
lastLogFlag bool
lastWSID string
// enqueue tracking — the busy path calls EnqueueA2A instead of firing.
enqueues int
lastEnqBody []byte
lastEnqKey string
enqQueueID string
enqDepth int
enqErr error
}
func (p *recordingProxy) ProxyA2ARequest(
@@ -89,6 +97,25 @@ func (p *recordingProxy) ProxyA2ARequest(
return p.status, p.body, nil
}
// EnqueueA2A records the busy-path enqueue so tests can assert that a tick on a
// busy workspace was buffered (not fired, not skipped).
func (p *recordingProxy) EnqueueA2A(
_ context.Context, workspaceID, callerID string, _ int, body []byte, _ string, idempotencyKey string, _ *time.Time,
) (string, int, error) {
p.enqueues++
p.lastWSID = workspaceID
p.lastCaller = callerID
p.lastEnqBody = body
p.lastEnqKey = idempotencyKey
if p.enqErr != nil {
return "", 0, p.enqErr
}
if p.enqQueueID == "" {
p.enqQueueID = "q-rec-1"
}
return p.enqQueueID, p.enqDepth, nil
}
// ── connection + fixture helpers ──────────────────────────────────────────
// integrationDB returns the configured integration-test connection or skips
@@ -42,6 +42,13 @@ func (p *panicProxy) ProxyA2ARequest(
panic("simulated A2A proxy panic")
}
// EnqueueA2A satisfies the extended A2AProxy interface; panics like the fire path.
func (p *panicProxy) EnqueueA2A(
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
) (string, int, error) {
panic("simulated A2A enqueue panic")
}
// ── TestLastTickAt_zero ───────────────────────────────────────────────────────
// TestLastTickAt_zero confirms that LastTickAt returns a zero time.Time on a
@@ -210,6 +217,90 @@ func TestShort_helper(t *testing.T) {
}
// ── TestRecordSkipped_writesSkippedStatus ────────────────────────────────────
// ── busyEnqueueProxy + TestFireSchedule_BusyEnqueuesInsteadOfSkipping ──────────
//
// Replaces the old "busy → skip after 2 min" assertion. When the workspace is
// at capacity, fireSchedule must ENQUEUE the tick into the durable a2a_queue
// (keyed by schedule_id) and record last_status='queued' — NOT fire and NOT
// recordSkipped. Proves the scheduled-tick-starvation fix.
type busyEnqueueProxy struct {
fired int
enqueued int
enqKey string
enqMethod string
enqPriority int
}
func (p *busyEnqueueProxy) ProxyA2ARequest(
_ context.Context, _ string, _ []byte, _ string, _ bool,
) (int, []byte, error) {
p.fired++
return 200, []byte(`{"ok":true}`), nil
}
func (p *busyEnqueueProxy) EnqueueA2A(
_ context.Context, _ string, _ string, priority int, _ []byte, method, idempotencyKey string, _ *time.Time,
) (string, int, error) {
p.enqueued++
p.enqKey = idempotencyKey
p.enqMethod = method
p.enqPriority = priority
return "q-busy-1", 1, nil
}
func TestFireSchedule_BusyEnqueuesInsteadOfSkipping(t *testing.T) {
mock := setupTestDB(t)
sched := scheduleRow{
ID: "77777777-dead-beef-0000-000000000007",
WorkspaceID: "88888888-dead-beef-0000-000000000008",
Name: "busy-enqueue-job",
CronExpr: "*/5 * * * *",
Timezone: "UTC",
Prompt: "tick while busy",
}
// Capacity check → active_tasks(2) >= max_concurrent(1): workspace is busy.
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"active_tasks", "max"}).AddRow(2, 1))
// recordQueued UPDATE — binds ($1=sched.ID, $2=nextRunPtr, $3=reason);
// last_status='queued' is a SQL literal, not a bound arg.
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// recordQueued activity_logs INSERT — binds 4 args (workspace_id, summary,
// request_body, error_detail); status='queued' is a SQL literal.
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
proxy := &busyEnqueueProxy{}
s := New(proxy, nil)
s.fireSchedule(context.Background(), sched)
if proxy.fired != 0 {
t.Errorf("busy workspace: ProxyA2ARequest must NOT fire, got %d fires", proxy.fired)
}
if proxy.enqueued != 1 {
t.Fatalf("busy workspace: expected exactly 1 EnqueueA2A, got %d", proxy.enqueued)
}
if proxy.enqKey != sched.ID {
t.Errorf("idempotency key must be schedule_id %q (buffer-latest dedup), got %q", sched.ID, proxy.enqKey)
}
if proxy.enqMethod != "message/send" {
t.Errorf("enqueued method = %q, want \"message/send\"", proxy.enqMethod)
}
if proxy.enqPriority != priorityTask {
t.Errorf("enqueued priority = %d, want priorityTask(%d)", proxy.enqPriority, priorityTask)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations — busy tick not recorded as queued: %v", err)
}
}
// #115 coverage gap: the recordSkipped path wasn't tested at all when it
// first landed. Exercises the UPDATE workspace_schedules + INSERT into
// activity_logs via sqlmock. Broadcaster is nil so we don't need to stub
@@ -257,6 +348,13 @@ func (p *successProxy) ProxyA2ARequest(
return 200, []byte(`{"ok":true}`), nil
}
// EnqueueA2A satisfies the extended A2AProxy interface.
func (p *successProxy) EnqueueA2A(
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
) (string, int, error) {
return "q-success", 1, nil
}
// ── adapterErrorProxy ─────────────────────────────────────────────────────────
// adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200
@@ -270,6 +368,13 @@ func (p *adapterErrorProxy) ProxyA2ARequest(
return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil
}
// EnqueueA2A satisfies the extended A2AProxy interface.
func (p *adapterErrorProxy) EnqueueA2A(
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
) (string, int, error) {
return "q-adaptererr", 1, nil
}
// ── TestFireSchedule_AdapterSDKError (#1696) ──────────────────────────────────
//
// When the adapter SDK throws internally and returns HTTP 200 with an error
@@ -667,6 +772,7 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) {
"recordSkipped must advance next_run_at when workspace is busy (#1029)", err)
}
}
// trigger CI
// ── TestDetectResultKind ───────────────────────────────────────────────────────
@@ -833,10 +939,10 @@ func TestDetectResultKind(t *testing.T) {
//
// When ProxyA2ARequest returns HTTP 200 but the response body contains a
// non-ok result_kind, fireSchedule must:
// 1. Set last_status to the result_kind (not 'ok').
// 2. Set last_error to describe the SDK error.
// 3. Increment consecutive_sdk_errors.
// 4. NOT auto-disable on first occurrence (threshold is 3).
// 1. Set last_status to the result_kind (not 'ok').
// 2. Set last_error to describe the SDK error.
// 3. Increment consecutive_sdk_errors.
// 4. NOT auto-disable on first occurrence (threshold is 3).
//
// This test uses an sdkErrorProxy that returns a rate-limited body and asserts
// the first run is recorded as 'rate_limited' with consecutive_sdk_errors=1
@@ -999,6 +1105,13 @@ func (p *sdkErrorProxy) ProxyA2ARequest(
return 200, body, nil
}
// EnqueueA2A satisfies the extended A2AProxy interface.
func (p *sdkErrorProxy) EnqueueA2A(
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
) (string, int, error) {
return "q-sdkerr", 1, nil
}
// ── TestTruncate_utf8Safe_regression2026 ──────────────────────────────────────
// TestTruncate_utf8Safe_regression2026 locks in the #2026 fix: truncate must