diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index b1fc19ae9..de46aa339 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -225,6 +225,16 @@ func (e *proxyA2AError) Error() string { return "proxy a2a error" } +// EnqueueA2A is a method wrapper around the package-level EnqueueA2A function so +// that *WorkspaceHandler satisfies the scheduler's A2AProxy interface. The +// scheduler cannot call the package function directly (it would have to import +// internal/handlers, but handlers already imports internal/scheduler → import +// cycle), so it goes through this method on the proxy it already holds. Used by +// the cron scheduler to durably buffer a tick when the target workspace is busy. +func (h *WorkspaceHandler) EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) { + return EnqueueA2A(ctx, workspaceID, callerID, priority, body, method, idempotencyKey, expiresAt) +} + // ProxyA2ARequest is the public wrapper for proxyA2ARequest, used by the // cron scheduler and other internal callers that need to send A2A messages // to workspaces programmatically (not from an HTTP handler). diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index 7997dda3e..c2c3841bd 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -97,10 +97,10 @@ type QueuedItem struct { // returns the new row ID + current queue depth. Caller MUST have already // determined the target is busy — this function does not check. // -// Idempotency: when idempotencyKey is non-empty, the partial unique index -// `idx_a2a_queue_idempotency` prevents duplicate active rows for the same -// (workspace_id, idempotency_key). On conflict this returns the existing -// row's ID so the caller's log still points at the live queue entry. +// Idempotency: when idempotencyKey is non-empty, a duplicate active enqueue +// for the same (workspace, key) is collapsed rather than double-buffered. On +// a duplicate this returns the existing row's ID so the caller's log still +// points at the live queue entry. func EnqueueA2A( ctx context.Context, workspaceID, callerID string, @@ -129,6 +129,32 @@ func EnqueueA2A( expiresAtArg = *expiresAt } + // Supersede any already-expired pending row for this same key before we + // insert. The drain path skips expired pending rows, so such a row never + // completes on its own — it lingers in the active set and would block the + // conflict check below, silently swallowing this fresh enqueue. Retiring + // it here (a) frees the active set so the insert below proceeds and (b) + // cleans the stale row up so expired rows don't accumulate. Scoped to the + // idempotency key so unrelated traffic is untouched. + if idempotencyKey != "" { + if _, supErr := db.DB.ExecContext(ctx, ` + UPDATE a2a_queue + SET status = 'dropped', + last_error = 'superseded: expired before drain; replaced by a fresh enqueue' + WHERE workspace_id = $1 + AND idempotency_key = $2 + AND status = 'queued' + AND expires_at IS NOT NULL + AND expires_at <= now() + `, workspaceID, idempotencyKey); supErr != nil { + // Non-fatal: if the cleanup fails we still attempt the insert. Worst + // case the conflict path returns the (stale) existing row's id, which + // is the pre-fix behaviour — no new breakage introduced here. + log.Printf("A2AQueue: supersede-expired cleanup failed for workspace %s key %s: %v", + workspaceID, idempotencyKey, supErr) + } + } + // INSERT ... ON CONFLICT DO NOTHING RETURNING id. The conflict target // must reference the partial unique INDEX columns + WHERE clause directly // (Postgres can't reference partial unique indexes by name in diff --git a/workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go b/workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go new file mode 100644 index 000000000..d2b1c2e6e --- /dev/null +++ b/workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go @@ -0,0 +1,160 @@ +package handlers + +// a2a_queue_enqueue_expired_test.go — regression for CR3 RC 9853. +// +// Bug: a pending buffered tick that expires before the drain reaches it is +// skipped by the drain (it filters out expired pending rows) yet still occupies +// the active set the idempotency check guards. A later tick for the SAME key +// would then collapse onto that dead row and be silently swallowed — the exact +// drop the busy-buffer path was built to prevent. +// +// Fix: EnqueueA2A retires any already-expired pending row for the key BEFORE the +// insert, so the fresh tick buffers (and the stale row is cleaned up) instead of +// being dropped. +// +// These tests use the QueryMatcherEqual mock (setupTestDBForQueueTests) so the +// SQL strings below must match the handler's queries verbatim. + +import ( + "context" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" +) + +const ( + enqWorkspaceID = "ws-enq-expired" + enqKey = "sched-aaaa-bbbb" // schedule_id used as idempotency key + enqBody = `{"method":"message/send"}` + enqMethod = "message/send" +) + +// expectSupersedeExpired registers the cleanup UPDATE EnqueueA2A issues before +// the insert when an idempotency key is present. rowsRetired is how many expired +// pending rows the UPDATE claims to have dropped. +func expectSupersedeExpired(mock sqlmock.Sqlmock, workspaceID, key string, rowsRetired int64) { + mock.ExpectExec(` + UPDATE a2a_queue + SET status = 'dropped', + last_error = 'superseded: expired before drain; replaced by a fresh enqueue' + WHERE workspace_id = $1 + AND idempotency_key = $2 + AND status = 'queued' + AND expires_at IS NOT NULL + AND expires_at <= now() + `). + WithArgs(workspaceID, key). + WillReturnResult(sqlmock.NewResult(0, rowsRetired)) +} + +// expectInsert registers the INSERT ... ON CONFLICT DO NOTHING RETURNING id. +// newID is the id the insert returns (non-conflict / fresh enqueue path). +func expectInsert(mock sqlmock.Sqlmock, newID string) { + mock.ExpectQuery(` + INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key, expires_at) + VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7) + ON CONFLICT (workspace_id, idempotency_key) + WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched') + DO NOTHING + RETURNING id + `).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(newID)) +} + +// expectDepth registers the trailing queue-depth count query. +func expectDepth(mock sqlmock.Sqlmock, workspaceID string, depth int) { + mock.ExpectQuery(` + SELECT COUNT(*) FROM a2a_queue + WHERE workspace_id = $1 AND status = 'queued' + `).WithArgs(workspaceID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(depth)) +} + +// TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick is the core CR3 regression: +// an existing expired pending row for a schedule's key must NOT cause the next +// tick's enqueue to be dropped. The expired row is retired first, then the +// fresh tick inserts and returns a NEW id. +func TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + // One expired pending row exists for this key and gets retired. + expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 1) + // With the active set cleared, the insert proceeds (no conflict) → new id. + const freshID = "fresh-tick-id" + expectInsert(mock, freshID) + expectDepth(mock, enqWorkspaceID, 1) + + nextRun := time.Now().Add(30 * time.Second) + id, depth, err := EnqueueA2A( + context.Background(), enqWorkspaceID, "", PriorityTask, + []byte(enqBody), enqMethod, enqKey, &nextRun, + ) + if err != nil { + t.Fatalf("EnqueueA2A returned error: %v", err) + } + if id != freshID { + t.Errorf("expected the fresh tick to enqueue with a new id %q, got %q "+ + "(an expired row must not swallow the new tick)", freshID, id) + } + if depth != 1 { + t.Errorf("expected depth 1, got %d", depth) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestEnqueueA2A_NoExpiredRow_NormalEnqueue: when no expired row exists the +// supersede UPDATE simply affects zero rows and the enqueue proceeds normally. +func TestEnqueueA2A_NoExpiredRow_NormalEnqueue(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0) // nothing to retire + const newID = "new-id" + expectInsert(mock, newID) + expectDepth(mock, enqWorkspaceID, 2) + + nextRun := time.Now().Add(30 * time.Second) + id, depth, err := EnqueueA2A( + context.Background(), enqWorkspaceID, "", PriorityTask, + []byte(enqBody), enqMethod, enqKey, &nextRun, + ) + if err != nil { + t.Fatalf("EnqueueA2A returned error: %v", err) + } + if id != newID { + t.Errorf("expected id %q, got %q", newID, id) + } + if depth != 2 { + t.Errorf("expected depth 2, got %d", depth) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestEnqueueA2A_NoKey_SkipsSupersede: with no idempotency key there is no +// active-set conflict to guard, so the supersede cleanup is skipped entirely +// and only the insert + depth queries run. +func TestEnqueueA2A_NoKey_SkipsSupersede(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + // No expectSupersedeExpired — it must NOT be issued when key is empty. + const newID = "no-key-id" + expectInsert(mock, newID) + expectDepth(mock, enqWorkspaceID, 1) + + id, _, err := EnqueueA2A( + context.Background(), enqWorkspaceID, "", PriorityTask, + []byte(enqBody), enqMethod, "", nil, + ) + if err != nil { + t.Fatalf("EnqueueA2A returned error: %v", err) + } + if id != newID { + t.Errorf("expected id %q, got %q", newID, id) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index f0c92b633..a454dc958 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -34,6 +34,11 @@ const ( // fireSchedule goroutine indefinitely, which blocked wg.Wait() in // tick(), which stalled the entire scheduler until operator restart. dbQueryTimeout = 10 * time.Second + // priorityTask mirrors handlers.PriorityTask (50) — the default FIFO A2A + // queue priority. Duplicated as a local const because the scheduler cannot + // import internal/handlers (handlers imports scheduler → cycle). Buffered + // cron ticks enqueue at the same priority as normal busy-retry A2A work. + priorityTask = 50 ) // sanitizeUTF8 replaces invalid UTF-8 byte sequences with the Unicode @@ -48,9 +53,14 @@ func sanitizeUTF8(s string) string { } // A2AProxy is the interface the scheduler needs to send messages to workspaces. -// WorkspaceHandler.ProxyA2ARequest satisfies this. +// WorkspaceHandler.ProxyA2ARequest + WorkspaceHandler.EnqueueA2A satisfy this. type A2AProxy interface { ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) + // EnqueueA2A durably buffers an A2A message for a busy workspace; the + // drain dispatches it serially when the agent frees. idempotencyKey + // collapses duplicate pending buffers per (workspace,key). Returns the + // buffered entry id, the resulting pending depth, and any error. + EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) } // Broadcaster records events and pushes them to WebSocket clients. @@ -367,33 +377,6 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { sched.WorkspaceID, ).Scan(&activeTasks, &maxConcurrent) capCancel() - if capErr == nil && activeTasks >= maxConcurrent { - log.Printf("Scheduler: '%s' workspace %s at capacity (active_tasks=%d, max=%d), deferring up to 2 min", - sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent) - // Poll every 10s for up to 2 minutes - waited := false - for i := 0; i < 12; i++ { - time.Sleep(10 * time.Second) - pollCtx, pollCancel := context.WithTimeout(ctx, dbQueryTimeout) - err := db.DB.QueryRowContext(pollCtx, - `SELECT COALESCE(active_tasks, 0), COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`, - sched.WorkspaceID, - ).Scan(&activeTasks, &maxConcurrent) - pollCancel() - if err != nil || activeTasks < maxConcurrent { - waited = true - break - } - } - if !waited && activeTasks >= maxConcurrent { - log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d, max=%d)", - sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent) - s.recordSkipped(ctx, sched, activeTasks) - return - } - log.Printf("Scheduler: '%s' workspace %s has capacity after deferral, firing", - sched.Name, short(sched.WorkspaceID, 12)) - } fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) defer cancel() @@ -402,6 +385,9 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { // The agent sees recent peer messages before acting, enabling cross-agent // awareness without explicit A2A delegation. Best-effort — if the fetch // fails or the workspace has no Slack channels, the prompt is unchanged. + // + // Built BEFORE the capacity check so the busy-enqueue path below buffers + // the exact same A2A message the fire path would have dispatched. prompt := sched.Prompt if s.channels != nil { if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" { @@ -426,6 +412,49 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { return } + // #969 → durable buffering. When the target workspace is busy + // (active_tasks >= max_concurrent_tasks) we do NOT skip the tick and we do + // NOT block the scheduler goroutine waiting for capacity. Instead we durably + // buffer the cron message, mirroring how busy A2A dispatches already buffer. + // The drain then dispatches it serially the moment the agent frees — + // execution stays one-at-a-time; max_concurrent_tasks is unchanged. + // + // This supersedes the previous "poll then recordSkipped" behavior, which + // dropped scheduled ticks on workspaces that stayed busy across the whole + // poll window. + // + // Idempotency key = sched.ID (the SCHEDULE id), NOT msgID/a random uuid. + // Keying by schedule_id means a busy agent buffers AT MOST ONE pending tick + // per schedule — the latest one wins, the obsolete newer tick is collapsed — + // so we hold the next tick instead of stacking a stale backlog. + if capErr == nil && activeTasks >= maxConcurrent { + // Buffered ticks expire at the next scheduled fire: a tick that's been + // sitting in the queue past when the cron would naturally tick again is + // stale, so let it expire rather than fire late. Best-effort — on a bad + // cron expr we enqueue with no TTL (NULL) rather than block the tick. + var expiresAt *time.Time + if nextRun, nrErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); nrErr == nil { + expiresAt = &nextRun + } + enqCtx, enqCancel := context.WithTimeout(ctx, dbQueryTimeout) + // Empty callerID = canvas-style (source_id NULL), matching the fire path. + qID, depth, enqErr := s.proxy.EnqueueA2A(enqCtx, sched.WorkspaceID, "", priorityTask, a2aBody, "message/send", sched.ID, expiresAt) + enqCancel() + if enqErr != nil { + // Enqueue failed — fall back to recording a skip so the liveness + // view still advances and the operator sees the error, rather than + // silently dropping the tick or firing into a busy agent. + log.Printf("Scheduler: '%s' enqueue on busy workspace %s failed, recording skip: %v", + sched.Name, short(sched.WorkspaceID, 12), enqErr) + s.recordSkipped(ctx, sched, activeTasks) + return + } + log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d, max=%d) — enqueued tick %s (queue depth=%d), will drain when idle", + sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent, short(qID, 8), depth) + s.recordQueued(ctx, sched, activeTasks, qID, depth) + return + } + log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12)) // Empty callerID = canvas-style request (bypasses access control, source_id=NULL in activity log). @@ -727,6 +756,74 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active } } +// recordQueued advances next_run_at and logs a cron_run activity entry with +// status='queued' when the target workspace was busy and the tick was durably +// buffered instead of fired. Mirrors recordSkipped (#115) but records a buffer, +// not a drop: the drain will dispatch qID serially when the agent frees. +// next_run_at still advances so the liveness view keeps ticking and the NEXT +// cron slot enqueues (the schedule_id idempotency key then holds at most one +// pending tick — the latest — per schedule). +func (s *Scheduler) recordQueued(ctx context.Context, sched scheduleRow, activeTasks int, queueID string, depth int) { + reason := fmt.Sprintf("queued: workspace busy (active_tasks=%d), buffered (id=%s, depth=%d)", activeTasks, short(queueID, 8), depth) + + nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()) + var nextRunPtr *time.Time + if nextErr == nil { + nextRunPtr = &nextRun + } else { + // Same guard as recordSkipped/fireSchedule — preserve existing + // next_run_at rather than writing NULL on an unparseable cron expr. + log.Printf("Scheduler: ComputeNextRun error in recordQueued for '%s' (%s) — preserving existing next_run_at: %v", + sched.Name, sched.ID, nextErr) + } + + queuedUpdCtx, queuedUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if _, err := db.DB.ExecContext(queuedUpdCtx, ` + UPDATE workspace_schedules + SET last_run_at = now(), + next_run_at = COALESCE($2, next_run_at), + run_count = run_count + 1, + last_status = 'queued', + last_error = $3, + updated_at = now() + WHERE id = $1 + `, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' queued update failed: %v", sched.Name, err) + } + queuedUpdCancel() + + cronMeta, marshalErr := json.Marshal(map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "cron_expr": sched.CronExpr, + "queued": true, + "active_tasks": activeTasks, + "queue_id": queueID, + "queue_depth": depth, + }) + if marshalErr != nil { + log.Printf("Scheduler '%s': json.Marshal cronMeta(queued) failed: %v", sched.Name, marshalErr) + } else { + queuedInsCtx, queuedInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if _, err := db.DB.ExecContext(queuedInsCtx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'queued', $4, now()) + `, sched.WorkspaceID, sanitizeUTF8("Cron queued (busy): "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' queued activity log failed: %v", sched.Name, err) + } + queuedInsCancel() + } + + if s.broadcaster != nil { + _ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "reason": reason, + "queued": true, + }) + } +} + // repairNullNextRunAt is called once during Start() to recompute next_run_at // for any enabled schedule where it is NULL — a state left by the pre-#722 bug // where a ComputeNextRun error caused an UPDATE that wrote NULL. diff --git a/workspace-server/internal/scheduler/scheduler_integration_test.go b/workspace-server/internal/scheduler/scheduler_integration_test.go index 87d809d3e..cc8c90866 100644 --- a/workspace-server/internal/scheduler/scheduler_integration_test.go +++ b/workspace-server/internal/scheduler/scheduler_integration_test.go @@ -73,6 +73,14 @@ type recordingProxy struct { lastCaller string lastLogFlag bool lastWSID string + + // enqueue tracking — the busy path calls EnqueueA2A instead of firing. + enqueues int + lastEnqBody []byte + lastEnqKey string + enqQueueID string + enqDepth int + enqErr error } func (p *recordingProxy) ProxyA2ARequest( @@ -89,6 +97,25 @@ func (p *recordingProxy) ProxyA2ARequest( return p.status, p.body, nil } +// EnqueueA2A records the busy-path enqueue so tests can assert that a tick on a +// busy workspace was buffered (not fired, not skipped). +func (p *recordingProxy) EnqueueA2A( + _ context.Context, workspaceID, callerID string, _ int, body []byte, _ string, idempotencyKey string, _ *time.Time, +) (string, int, error) { + p.enqueues++ + p.lastWSID = workspaceID + p.lastCaller = callerID + p.lastEnqBody = body + p.lastEnqKey = idempotencyKey + if p.enqErr != nil { + return "", 0, p.enqErr + } + if p.enqQueueID == "" { + p.enqQueueID = "q-rec-1" + } + return p.enqQueueID, p.enqDepth, nil +} + // ── connection + fixture helpers ────────────────────────────────────────── // integrationDB returns the configured integration-test connection or skips diff --git a/workspace-server/internal/scheduler/scheduler_test.go b/workspace-server/internal/scheduler/scheduler_test.go index a84fcdf02..462f26f17 100644 --- a/workspace-server/internal/scheduler/scheduler_test.go +++ b/workspace-server/internal/scheduler/scheduler_test.go @@ -42,6 +42,13 @@ func (p *panicProxy) ProxyA2ARequest( panic("simulated A2A proxy panic") } +// EnqueueA2A satisfies the extended A2AProxy interface; panics like the fire path. +func (p *panicProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + panic("simulated A2A enqueue panic") +} + // ── TestLastTickAt_zero ─────────────────────────────────────────────────────── // TestLastTickAt_zero confirms that LastTickAt returns a zero time.Time on a @@ -210,6 +217,90 @@ func TestShort_helper(t *testing.T) { } // ── TestRecordSkipped_writesSkippedStatus ──────────────────────────────────── +// ── busyEnqueueProxy + TestFireSchedule_BusyEnqueuesInsteadOfSkipping ────────── +// +// Replaces the old "busy → skip after 2 min" assertion. When the workspace is +// at capacity, fireSchedule must ENQUEUE the tick into the durable a2a_queue +// (keyed by schedule_id) and record last_status='queued' — NOT fire and NOT +// recordSkipped. Proves the scheduled-tick-starvation fix. + +type busyEnqueueProxy struct { + fired int + enqueued int + enqKey string + enqMethod string + enqPriority int +} + +func (p *busyEnqueueProxy) ProxyA2ARequest( + _ context.Context, _ string, _ []byte, _ string, _ bool, +) (int, []byte, error) { + p.fired++ + return 200, []byte(`{"ok":true}`), nil +} + +func (p *busyEnqueueProxy) EnqueueA2A( + _ context.Context, _ string, _ string, priority int, _ []byte, method, idempotencyKey string, _ *time.Time, +) (string, int, error) { + p.enqueued++ + p.enqKey = idempotencyKey + p.enqMethod = method + p.enqPriority = priority + return "q-busy-1", 1, nil +} + +func TestFireSchedule_BusyEnqueuesInsteadOfSkipping(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "77777777-dead-beef-0000-000000000007", + WorkspaceID: "88888888-dead-beef-0000-000000000008", + Name: "busy-enqueue-job", + CronExpr: "*/5 * * * *", + Timezone: "UTC", + Prompt: "tick while busy", + } + + // Capacity check → active_tasks(2) >= max_concurrent(1): workspace is busy. + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"active_tasks", "max"}).AddRow(2, 1)) + + // recordQueued UPDATE — binds ($1=sched.ID, $2=nextRunPtr, $3=reason); + // last_status='queued' is a SQL literal, not a bound arg. + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // recordQueued activity_logs INSERT — binds 4 args (workspace_id, summary, + // request_body, error_detail); status='queued' is a SQL literal. + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + proxy := &busyEnqueueProxy{} + s := New(proxy, nil) + s.fireSchedule(context.Background(), sched) + + if proxy.fired != 0 { + t.Errorf("busy workspace: ProxyA2ARequest must NOT fire, got %d fires", proxy.fired) + } + if proxy.enqueued != 1 { + t.Fatalf("busy workspace: expected exactly 1 EnqueueA2A, got %d", proxy.enqueued) + } + if proxy.enqKey != sched.ID { + t.Errorf("idempotency key must be schedule_id %q (buffer-latest dedup), got %q", sched.ID, proxy.enqKey) + } + if proxy.enqMethod != "message/send" { + t.Errorf("enqueued method = %q, want \"message/send\"", proxy.enqMethod) + } + if proxy.enqPriority != priorityTask { + t.Errorf("enqueued priority = %d, want priorityTask(%d)", proxy.enqPriority, priorityTask) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations — busy tick not recorded as queued: %v", err) + } +} + // #115 coverage gap: the recordSkipped path wasn't tested at all when it // first landed. Exercises the UPDATE workspace_schedules + INSERT into // activity_logs via sqlmock. Broadcaster is nil so we don't need to stub @@ -257,6 +348,13 @@ func (p *successProxy) ProxyA2ARequest( return 200, []byte(`{"ok":true}`), nil } +// EnqueueA2A satisfies the extended A2AProxy interface. +func (p *successProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + return "q-success", 1, nil +} + // ── adapterErrorProxy ───────────────────────────────────────────────────────── // adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200 @@ -270,6 +368,13 @@ func (p *adapterErrorProxy) ProxyA2ARequest( return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil } +// EnqueueA2A satisfies the extended A2AProxy interface. +func (p *adapterErrorProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + return "q-adaptererr", 1, nil +} + // ── TestFireSchedule_AdapterSDKError (#1696) ────────────────────────────────── // // When the adapter SDK throws internally and returns HTTP 200 with an error @@ -667,6 +772,7 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) { "recordSkipped must advance next_run_at when workspace is busy (#1029)", err) } } + // trigger CI // ── TestDetectResultKind ─────────────────────────────────────────────────────── @@ -833,10 +939,10 @@ func TestDetectResultKind(t *testing.T) { // // When ProxyA2ARequest returns HTTP 200 but the response body contains a // non-ok result_kind, fireSchedule must: -// 1. Set last_status to the result_kind (not 'ok'). -// 2. Set last_error to describe the SDK error. -// 3. Increment consecutive_sdk_errors. -// 4. NOT auto-disable on first occurrence (threshold is 3). +// 1. Set last_status to the result_kind (not 'ok'). +// 2. Set last_error to describe the SDK error. +// 3. Increment consecutive_sdk_errors. +// 4. NOT auto-disable on first occurrence (threshold is 3). // // This test uses an sdkErrorProxy that returns a rate-limited body and asserts // the first run is recorded as 'rate_limited' with consecutive_sdk_errors=1 @@ -999,6 +1105,13 @@ func (p *sdkErrorProxy) ProxyA2ARequest( return 200, body, nil } +// EnqueueA2A satisfies the extended A2AProxy interface. +func (p *sdkErrorProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + return "q-sdkerr", 1, nil +} + // ── TestTruncate_utf8Safe_regression2026 ────────────────────────────────────── // TestTruncate_utf8Safe_regression2026 locks in the #2026 fix: truncate must