From 2e69e48a4e04a70359eefe4fe843659b5ee34d59 Mon Sep 17 00:00:00 2001 From: devops-engineer Date: Mon, 8 Jun 2026 21:19:23 +0000 Subject: [PATCH 1/2] fix(scheduler): enqueue cron ticks on busy agents instead of dropping them MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a workspace agent is busy (active_tasks >= max_concurrent_tasks), A2A dispatches already buffer durably into the a2a_queue table and get picked up when the agent frees. Scheduled/cron ticks did NOT: fireSchedule polled every 10s for up to 2 min and then called recordSkipped(), dropping the tick. On perpetually-busy workspaces (e.g. leaders kept busy by the Orchestrator pulse delegation chain) this dropped ~30% of scheduled fires while A2A work buffered. Now, on busy, fireSchedule ENQUEUES the cron message into the durable a2a_queue via EnqueueA2A (the same path A2A uses) with the SAME a2aBody the fire path builds, method "message/send", priority PriorityTask. The heartbeat drain then dispatches it serially when the agent frees. Execution stays one-at-a-time; max_concurrent_tasks is unchanged — this is purely about buffering ticks. Idempotency key = schedule_id (NOT a random uuid / messageId). The a2a_queue partial-unique index idx_a2a_queue_idempotency dedups on (workspace_id, idempotency_key) for status IN ('queued','dispatched'), so a busy agent buffers AT MOST ONE pending tick per schedule — the latest — instead of stacking a stale backlog of one-tick-per-poll. We hold the next tick, not a pile of obsolete ones. Enqueue happens immediately on busy (the 2-min poll-wait is removed): durable buffering makes the wait pointless and the wait blocked a scheduler goroutine. Buffered ticks get expiresAt = next scheduled fire so a tick stuck past its own next cron slot expires rather than firing stale. If EnqueueA2A errors we fall back to recordSkipped so liveness still advances and the operator sees it. Seam: handlers imports scheduler, so scheduler cannot import handlers (cycle). The scheduler's existing A2AProxy interface (held as s.proxy, satisfied by *WorkspaceHandler) is extended with an EnqueueA2A method that delegates to the package-level handlers.EnqueueA2A — no new import, no cycle. priorityTask is a local const mirroring handlers.PriorityTask for the same reason. Adds recordQueued (mirrors recordSkipped, last_status='queued') and a fireSchedule busy-path unit test asserting enqueue-not-fire-not-skip with idempotency_key=schedule_id. All test proxy doubles gain the EnqueueA2A method. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../internal/handlers/a2a_proxy.go | 10 ++ .../internal/scheduler/scheduler.go | 157 ++++++++++++++---- .../scheduler/scheduler_integration_test.go | 27 +++ .../internal/scheduler/scheduler_test.go | 121 +++++++++++++- 4 files changed, 283 insertions(+), 32 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index b1fc19ae9..de46aa339 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -225,6 +225,16 @@ func (e *proxyA2AError) Error() string { return "proxy a2a error" } +// EnqueueA2A is a method wrapper around the package-level EnqueueA2A function so +// that *WorkspaceHandler satisfies the scheduler's A2AProxy interface. The +// scheduler cannot call the package function directly (it would have to import +// internal/handlers, but handlers already imports internal/scheduler → import +// cycle), so it goes through this method on the proxy it already holds. Used by +// the cron scheduler to durably buffer a tick when the target workspace is busy. +func (h *WorkspaceHandler) EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) { + return EnqueueA2A(ctx, workspaceID, callerID, priority, body, method, idempotencyKey, expiresAt) +} + // ProxyA2ARequest is the public wrapper for proxyA2ARequest, used by the // cron scheduler and other internal callers that need to send A2A messages // to workspaces programmatically (not from an HTTP handler). diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index f0c92b633..6bd70b2c2 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -34,6 +34,11 @@ const ( // fireSchedule goroutine indefinitely, which blocked wg.Wait() in // tick(), which stalled the entire scheduler until operator restart. dbQueryTimeout = 10 * time.Second + // priorityTask mirrors handlers.PriorityTask (50) — the default FIFO A2A + // queue priority. Duplicated as a local const because the scheduler cannot + // import internal/handlers (handlers imports scheduler → cycle). Buffered + // cron ticks enqueue at the same priority as normal busy-retry A2A work. + priorityTask = 50 ) // sanitizeUTF8 replaces invalid UTF-8 byte sequences with the Unicode @@ -48,9 +53,14 @@ func sanitizeUTF8(s string) string { } // A2AProxy is the interface the scheduler needs to send messages to workspaces. -// WorkspaceHandler.ProxyA2ARequest satisfies this. +// WorkspaceHandler.ProxyA2ARequest + WorkspaceHandler.EnqueueA2A satisfy this. type A2AProxy interface { ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) + // EnqueueA2A durably buffers an A2A message in the a2a_queue table for a + // busy workspace; the heartbeat drain dispatches it serially when the + // agent frees. idempotencyKey dedups active rows per (workspace,key). + // Returns the queue row id, the resulting queued depth, and any error. + EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) } // Broadcaster records events and pushes them to WebSocket clients. @@ -367,33 +377,6 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { sched.WorkspaceID, ).Scan(&activeTasks, &maxConcurrent) capCancel() - if capErr == nil && activeTasks >= maxConcurrent { - log.Printf("Scheduler: '%s' workspace %s at capacity (active_tasks=%d, max=%d), deferring up to 2 min", - sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent) - // Poll every 10s for up to 2 minutes - waited := false - for i := 0; i < 12; i++ { - time.Sleep(10 * time.Second) - pollCtx, pollCancel := context.WithTimeout(ctx, dbQueryTimeout) - err := db.DB.QueryRowContext(pollCtx, - `SELECT COALESCE(active_tasks, 0), COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`, - sched.WorkspaceID, - ).Scan(&activeTasks, &maxConcurrent) - pollCancel() - if err != nil || activeTasks < maxConcurrent { - waited = true - break - } - } - if !waited && activeTasks >= maxConcurrent { - log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d, max=%d)", - sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent) - s.recordSkipped(ctx, sched, activeTasks) - return - } - log.Printf("Scheduler: '%s' workspace %s has capacity after deferral, firing", - sched.Name, short(sched.WorkspaceID, 12)) - } fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) defer cancel() @@ -402,6 +385,9 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { // The agent sees recent peer messages before acting, enabling cross-agent // awareness without explicit A2A delegation. Best-effort — if the fetch // fails or the workspace has no Slack channels, the prompt is unchanged. + // + // Built BEFORE the capacity check so the busy-enqueue path below buffers + // the exact same A2A message the fire path would have dispatched. prompt := sched.Prompt if s.channels != nil { if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" { @@ -426,6 +412,53 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { return } + // #969 → durable buffering. When the target workspace is busy + // (active_tasks >= max_concurrent_tasks) we do NOT skip the tick and we do + // NOT block the scheduler goroutine waiting for capacity. Instead we ENQUEUE + // the cron message into the durable a2a_queue, mirroring how busy A2A + // dispatches already buffer (internal/handlers/a2a_queue.go). The heartbeat + // drain then dispatches it serially the moment the agent frees — execution + // stays one-at-a-time; max_concurrent_tasks is unchanged. + // + // This supersedes the previous "poll 10s ×12 then recordSkipped" behavior, + // which dropped scheduled ticks on perpetually-busy workspaces (the + // Orchestrator pulse delegation chain kept leaders busy, ~30% cron drop). + // + // Idempotency key = sched.ID (the SCHEDULE id), NOT msgID/a random uuid. + // The a2a_queue partial-unique index idx_a2a_queue_idempotency dedups on + // (workspace_id, idempotency_key) for status IN ('queued','dispatched'). + // Keying by schedule_id means a busy agent buffers AT MOST ONE pending tick + // per schedule — the latest one wins (ON CONFLICT DO NOTHING keeps the + // already-queued row; the obsolete newer tick is dropped at the DB) — so we + // hold the next tick instead of stacking a stale backlog of one-tick-per-30s. + if capErr == nil && activeTasks >= maxConcurrent { + // Buffered ticks expire at the next scheduled fire: a tick that's been + // sitting in the queue past when the cron would naturally tick again is + // stale, so let it expire rather than fire late. Best-effort — on a bad + // cron expr we enqueue with no TTL (NULL) rather than block the tick. + var expiresAt *time.Time + if nextRun, nrErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); nrErr == nil { + expiresAt = &nextRun + } + enqCtx, enqCancel := context.WithTimeout(ctx, dbQueryTimeout) + // Empty callerID = canvas-style (source_id NULL), matching the fire path. + qID, depth, enqErr := s.proxy.EnqueueA2A(enqCtx, sched.WorkspaceID, "", priorityTask, a2aBody, "message/send", sched.ID, expiresAt) + enqCancel() + if enqErr != nil { + // Enqueue failed — fall back to recording a skip so the liveness + // view still advances and the operator sees the error, rather than + // silently dropping the tick or firing into a busy agent. + log.Printf("Scheduler: '%s' enqueue on busy workspace %s failed, recording skip: %v", + sched.Name, short(sched.WorkspaceID, 12), enqErr) + s.recordSkipped(ctx, sched, activeTasks) + return + } + log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d, max=%d) — enqueued tick %s (queue depth=%d), will drain when idle", + sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent, short(qID, 8), depth) + s.recordQueued(ctx, sched, activeTasks, qID, depth) + return + } + log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12)) // Empty callerID = canvas-style request (bypasses access control, source_id=NULL in activity log). @@ -727,6 +760,74 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active } } +// recordQueued advances next_run_at and logs a cron_run activity entry with +// status='queued' when the target workspace was busy and the tick was buffered +// into the durable a2a_queue instead of fired. Mirrors recordSkipped (#115) but +// records a buffer, not a drop: the heartbeat drain will dispatch qID serially +// when the agent frees. next_run_at still advances so the liveness view keeps +// ticking and the NEXT cron slot enqueues (the schedule_id idempotency key then +// holds at most one pending tick — the latest — per schedule). +func (s *Scheduler) recordQueued(ctx context.Context, sched scheduleRow, activeTasks int, queueID string, depth int) { + reason := fmt.Sprintf("queued: workspace busy (active_tasks=%d), buffered to a2a_queue (id=%s, depth=%d)", activeTasks, short(queueID, 8), depth) + + nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()) + var nextRunPtr *time.Time + if nextErr == nil { + nextRunPtr = &nextRun + } else { + // Same guard as recordSkipped/fireSchedule — preserve existing + // next_run_at rather than writing NULL on an unparseable cron expr. + log.Printf("Scheduler: ComputeNextRun error in recordQueued for '%s' (%s) — preserving existing next_run_at: %v", + sched.Name, sched.ID, nextErr) + } + + queuedUpdCtx, queuedUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if _, err := db.DB.ExecContext(queuedUpdCtx, ` + UPDATE workspace_schedules + SET last_run_at = now(), + next_run_at = COALESCE($2, next_run_at), + run_count = run_count + 1, + last_status = 'queued', + last_error = $3, + updated_at = now() + WHERE id = $1 + `, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' queued update failed: %v", sched.Name, err) + } + queuedUpdCancel() + + cronMeta, marshalErr := json.Marshal(map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "cron_expr": sched.CronExpr, + "queued": true, + "active_tasks": activeTasks, + "queue_id": queueID, + "queue_depth": depth, + }) + if marshalErr != nil { + log.Printf("Scheduler '%s': json.Marshal cronMeta(queued) failed: %v", sched.Name, marshalErr) + } else { + queuedInsCtx, queuedInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout) + if _, err := db.DB.ExecContext(queuedInsCtx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'queued', $4, now()) + `, sched.WorkspaceID, sanitizeUTF8("Cron queued (busy): "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil { + log.Printf("Scheduler: '%s' queued activity log failed: %v", sched.Name, err) + } + queuedInsCancel() + } + + if s.broadcaster != nil { + _ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "reason": reason, + "queued": true, + }) + } +} + // repairNullNextRunAt is called once during Start() to recompute next_run_at // for any enabled schedule where it is NULL — a state left by the pre-#722 bug // where a ComputeNextRun error caused an UPDATE that wrote NULL. diff --git a/workspace-server/internal/scheduler/scheduler_integration_test.go b/workspace-server/internal/scheduler/scheduler_integration_test.go index 87d809d3e..cc8c90866 100644 --- a/workspace-server/internal/scheduler/scheduler_integration_test.go +++ b/workspace-server/internal/scheduler/scheduler_integration_test.go @@ -73,6 +73,14 @@ type recordingProxy struct { lastCaller string lastLogFlag bool lastWSID string + + // enqueue tracking — the busy path calls EnqueueA2A instead of firing. + enqueues int + lastEnqBody []byte + lastEnqKey string + enqQueueID string + enqDepth int + enqErr error } func (p *recordingProxy) ProxyA2ARequest( @@ -89,6 +97,25 @@ func (p *recordingProxy) ProxyA2ARequest( return p.status, p.body, nil } +// EnqueueA2A records the busy-path enqueue so tests can assert that a tick on a +// busy workspace was buffered (not fired, not skipped). +func (p *recordingProxy) EnqueueA2A( + _ context.Context, workspaceID, callerID string, _ int, body []byte, _ string, idempotencyKey string, _ *time.Time, +) (string, int, error) { + p.enqueues++ + p.lastWSID = workspaceID + p.lastCaller = callerID + p.lastEnqBody = body + p.lastEnqKey = idempotencyKey + if p.enqErr != nil { + return "", 0, p.enqErr + } + if p.enqQueueID == "" { + p.enqQueueID = "q-rec-1" + } + return p.enqQueueID, p.enqDepth, nil +} + // ── connection + fixture helpers ────────────────────────────────────────── // integrationDB returns the configured integration-test connection or skips diff --git a/workspace-server/internal/scheduler/scheduler_test.go b/workspace-server/internal/scheduler/scheduler_test.go index a84fcdf02..462f26f17 100644 --- a/workspace-server/internal/scheduler/scheduler_test.go +++ b/workspace-server/internal/scheduler/scheduler_test.go @@ -42,6 +42,13 @@ func (p *panicProxy) ProxyA2ARequest( panic("simulated A2A proxy panic") } +// EnqueueA2A satisfies the extended A2AProxy interface; panics like the fire path. +func (p *panicProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + panic("simulated A2A enqueue panic") +} + // ── TestLastTickAt_zero ─────────────────────────────────────────────────────── // TestLastTickAt_zero confirms that LastTickAt returns a zero time.Time on a @@ -210,6 +217,90 @@ func TestShort_helper(t *testing.T) { } // ── TestRecordSkipped_writesSkippedStatus ──────────────────────────────────── +// ── busyEnqueueProxy + TestFireSchedule_BusyEnqueuesInsteadOfSkipping ────────── +// +// Replaces the old "busy → skip after 2 min" assertion. When the workspace is +// at capacity, fireSchedule must ENQUEUE the tick into the durable a2a_queue +// (keyed by schedule_id) and record last_status='queued' — NOT fire and NOT +// recordSkipped. Proves the scheduled-tick-starvation fix. + +type busyEnqueueProxy struct { + fired int + enqueued int + enqKey string + enqMethod string + enqPriority int +} + +func (p *busyEnqueueProxy) ProxyA2ARequest( + _ context.Context, _ string, _ []byte, _ string, _ bool, +) (int, []byte, error) { + p.fired++ + return 200, []byte(`{"ok":true}`), nil +} + +func (p *busyEnqueueProxy) EnqueueA2A( + _ context.Context, _ string, _ string, priority int, _ []byte, method, idempotencyKey string, _ *time.Time, +) (string, int, error) { + p.enqueued++ + p.enqKey = idempotencyKey + p.enqMethod = method + p.enqPriority = priority + return "q-busy-1", 1, nil +} + +func TestFireSchedule_BusyEnqueuesInsteadOfSkipping(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "77777777-dead-beef-0000-000000000007", + WorkspaceID: "88888888-dead-beef-0000-000000000008", + Name: "busy-enqueue-job", + CronExpr: "*/5 * * * *", + Timezone: "UTC", + Prompt: "tick while busy", + } + + // Capacity check → active_tasks(2) >= max_concurrent(1): workspace is busy. + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"active_tasks", "max"}).AddRow(2, 1)) + + // recordQueued UPDATE — binds ($1=sched.ID, $2=nextRunPtr, $3=reason); + // last_status='queued' is a SQL literal, not a bound arg. + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // recordQueued activity_logs INSERT — binds 4 args (workspace_id, summary, + // request_body, error_detail); status='queued' is a SQL literal. + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + proxy := &busyEnqueueProxy{} + s := New(proxy, nil) + s.fireSchedule(context.Background(), sched) + + if proxy.fired != 0 { + t.Errorf("busy workspace: ProxyA2ARequest must NOT fire, got %d fires", proxy.fired) + } + if proxy.enqueued != 1 { + t.Fatalf("busy workspace: expected exactly 1 EnqueueA2A, got %d", proxy.enqueued) + } + if proxy.enqKey != sched.ID { + t.Errorf("idempotency key must be schedule_id %q (buffer-latest dedup), got %q", sched.ID, proxy.enqKey) + } + if proxy.enqMethod != "message/send" { + t.Errorf("enqueued method = %q, want \"message/send\"", proxy.enqMethod) + } + if proxy.enqPriority != priorityTask { + t.Errorf("enqueued priority = %d, want priorityTask(%d)", proxy.enqPriority, priorityTask) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations — busy tick not recorded as queued: %v", err) + } +} + // #115 coverage gap: the recordSkipped path wasn't tested at all when it // first landed. Exercises the UPDATE workspace_schedules + INSERT into // activity_logs via sqlmock. Broadcaster is nil so we don't need to stub @@ -257,6 +348,13 @@ func (p *successProxy) ProxyA2ARequest( return 200, []byte(`{"ok":true}`), nil } +// EnqueueA2A satisfies the extended A2AProxy interface. +func (p *successProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + return "q-success", 1, nil +} + // ── adapterErrorProxy ───────────────────────────────────────────────────────── // adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200 @@ -270,6 +368,13 @@ func (p *adapterErrorProxy) ProxyA2ARequest( return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil } +// EnqueueA2A satisfies the extended A2AProxy interface. +func (p *adapterErrorProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + return "q-adaptererr", 1, nil +} + // ── TestFireSchedule_AdapterSDKError (#1696) ────────────────────────────────── // // When the adapter SDK throws internally and returns HTTP 200 with an error @@ -667,6 +772,7 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) { "recordSkipped must advance next_run_at when workspace is busy (#1029)", err) } } + // trigger CI // ── TestDetectResultKind ─────────────────────────────────────────────────────── @@ -833,10 +939,10 @@ func TestDetectResultKind(t *testing.T) { // // When ProxyA2ARequest returns HTTP 200 but the response body contains a // non-ok result_kind, fireSchedule must: -// 1. Set last_status to the result_kind (not 'ok'). -// 2. Set last_error to describe the SDK error. -// 3. Increment consecutive_sdk_errors. -// 4. NOT auto-disable on first occurrence (threshold is 3). +// 1. Set last_status to the result_kind (not 'ok'). +// 2. Set last_error to describe the SDK error. +// 3. Increment consecutive_sdk_errors. +// 4. NOT auto-disable on first occurrence (threshold is 3). // // This test uses an sdkErrorProxy that returns a rate-limited body and asserts // the first run is recorded as 'rate_limited' with consecutive_sdk_errors=1 @@ -999,6 +1105,13 @@ func (p *sdkErrorProxy) ProxyA2ARequest( return 200, body, nil } +// EnqueueA2A satisfies the extended A2AProxy interface. +func (p *sdkErrorProxy) EnqueueA2A( + _ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time, +) (string, int, error) { + return "q-sdkerr", 1, nil +} + // ── TestTruncate_utf8Safe_regression2026 ────────────────────────────────────── // TestTruncate_utf8Safe_regression2026 locks in the #2026 fix: truncate must -- 2.52.0 From fb76309d84c593161f771003d8da3eeaa9aa2ffa Mon Sep 17 00:00:00 2001 From: devops-engineer Date: Mon, 8 Jun 2026 22:13:35 +0000 Subject: [PATCH 2/2] fix expired-row-conflict starvation (expired queued row no longer blocks a fresh tick's enqueue) + content-security comment generalization; refs CR3 RC 9853 Co-Authored-By: Claude Opus 4.8 (1M context) --- .../internal/handlers/a2a_queue.go | 34 +++- .../a2a_queue_enqueue_expired_test.go | 160 ++++++++++++++++++ .../internal/scheduler/scheduler.go | 44 +++-- 3 files changed, 210 insertions(+), 28 deletions(-) create mode 100644 workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index 7997dda3e..c2c3841bd 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -97,10 +97,10 @@ type QueuedItem struct { // returns the new row ID + current queue depth. Caller MUST have already // determined the target is busy — this function does not check. // -// Idempotency: when idempotencyKey is non-empty, the partial unique index -// `idx_a2a_queue_idempotency` prevents duplicate active rows for the same -// (workspace_id, idempotency_key). On conflict this returns the existing -// row's ID so the caller's log still points at the live queue entry. +// Idempotency: when idempotencyKey is non-empty, a duplicate active enqueue +// for the same (workspace, key) is collapsed rather than double-buffered. On +// a duplicate this returns the existing row's ID so the caller's log still +// points at the live queue entry. func EnqueueA2A( ctx context.Context, workspaceID, callerID string, @@ -129,6 +129,32 @@ func EnqueueA2A( expiresAtArg = *expiresAt } + // Supersede any already-expired pending row for this same key before we + // insert. The drain path skips expired pending rows, so such a row never + // completes on its own — it lingers in the active set and would block the + // conflict check below, silently swallowing this fresh enqueue. Retiring + // it here (a) frees the active set so the insert below proceeds and (b) + // cleans the stale row up so expired rows don't accumulate. Scoped to the + // idempotency key so unrelated traffic is untouched. + if idempotencyKey != "" { + if _, supErr := db.DB.ExecContext(ctx, ` + UPDATE a2a_queue + SET status = 'dropped', + last_error = 'superseded: expired before drain; replaced by a fresh enqueue' + WHERE workspace_id = $1 + AND idempotency_key = $2 + AND status = 'queued' + AND expires_at IS NOT NULL + AND expires_at <= now() + `, workspaceID, idempotencyKey); supErr != nil { + // Non-fatal: if the cleanup fails we still attempt the insert. Worst + // case the conflict path returns the (stale) existing row's id, which + // is the pre-fix behaviour — no new breakage introduced here. + log.Printf("A2AQueue: supersede-expired cleanup failed for workspace %s key %s: %v", + workspaceID, idempotencyKey, supErr) + } + } + // INSERT ... ON CONFLICT DO NOTHING RETURNING id. The conflict target // must reference the partial unique INDEX columns + WHERE clause directly // (Postgres can't reference partial unique indexes by name in diff --git a/workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go b/workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go new file mode 100644 index 000000000..d2b1c2e6e --- /dev/null +++ b/workspace-server/internal/handlers/a2a_queue_enqueue_expired_test.go @@ -0,0 +1,160 @@ +package handlers + +// a2a_queue_enqueue_expired_test.go — regression for CR3 RC 9853. +// +// Bug: a pending buffered tick that expires before the drain reaches it is +// skipped by the drain (it filters out expired pending rows) yet still occupies +// the active set the idempotency check guards. A later tick for the SAME key +// would then collapse onto that dead row and be silently swallowed — the exact +// drop the busy-buffer path was built to prevent. +// +// Fix: EnqueueA2A retires any already-expired pending row for the key BEFORE the +// insert, so the fresh tick buffers (and the stale row is cleaned up) instead of +// being dropped. +// +// These tests use the QueryMatcherEqual mock (setupTestDBForQueueTests) so the +// SQL strings below must match the handler's queries verbatim. + +import ( + "context" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" +) + +const ( + enqWorkspaceID = "ws-enq-expired" + enqKey = "sched-aaaa-bbbb" // schedule_id used as idempotency key + enqBody = `{"method":"message/send"}` + enqMethod = "message/send" +) + +// expectSupersedeExpired registers the cleanup UPDATE EnqueueA2A issues before +// the insert when an idempotency key is present. rowsRetired is how many expired +// pending rows the UPDATE claims to have dropped. +func expectSupersedeExpired(mock sqlmock.Sqlmock, workspaceID, key string, rowsRetired int64) { + mock.ExpectExec(` + UPDATE a2a_queue + SET status = 'dropped', + last_error = 'superseded: expired before drain; replaced by a fresh enqueue' + WHERE workspace_id = $1 + AND idempotency_key = $2 + AND status = 'queued' + AND expires_at IS NOT NULL + AND expires_at <= now() + `). + WithArgs(workspaceID, key). + WillReturnResult(sqlmock.NewResult(0, rowsRetired)) +} + +// expectInsert registers the INSERT ... ON CONFLICT DO NOTHING RETURNING id. +// newID is the id the insert returns (non-conflict / fresh enqueue path). +func expectInsert(mock sqlmock.Sqlmock, newID string) { + mock.ExpectQuery(` + INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key, expires_at) + VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7) + ON CONFLICT (workspace_id, idempotency_key) + WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched') + DO NOTHING + RETURNING id + `).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(newID)) +} + +// expectDepth registers the trailing queue-depth count query. +func expectDepth(mock sqlmock.Sqlmock, workspaceID string, depth int) { + mock.ExpectQuery(` + SELECT COUNT(*) FROM a2a_queue + WHERE workspace_id = $1 AND status = 'queued' + `).WithArgs(workspaceID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(depth)) +} + +// TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick is the core CR3 regression: +// an existing expired pending row for a schedule's key must NOT cause the next +// tick's enqueue to be dropped. The expired row is retired first, then the +// fresh tick inserts and returns a NEW id. +func TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + // One expired pending row exists for this key and gets retired. + expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 1) + // With the active set cleared, the insert proceeds (no conflict) → new id. + const freshID = "fresh-tick-id" + expectInsert(mock, freshID) + expectDepth(mock, enqWorkspaceID, 1) + + nextRun := time.Now().Add(30 * time.Second) + id, depth, err := EnqueueA2A( + context.Background(), enqWorkspaceID, "", PriorityTask, + []byte(enqBody), enqMethod, enqKey, &nextRun, + ) + if err != nil { + t.Fatalf("EnqueueA2A returned error: %v", err) + } + if id != freshID { + t.Errorf("expected the fresh tick to enqueue with a new id %q, got %q "+ + "(an expired row must not swallow the new tick)", freshID, id) + } + if depth != 1 { + t.Errorf("expected depth 1, got %d", depth) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestEnqueueA2A_NoExpiredRow_NormalEnqueue: when no expired row exists the +// supersede UPDATE simply affects zero rows and the enqueue proceeds normally. +func TestEnqueueA2A_NoExpiredRow_NormalEnqueue(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0) // nothing to retire + const newID = "new-id" + expectInsert(mock, newID) + expectDepth(mock, enqWorkspaceID, 2) + + nextRun := time.Now().Add(30 * time.Second) + id, depth, err := EnqueueA2A( + context.Background(), enqWorkspaceID, "", PriorityTask, + []byte(enqBody), enqMethod, enqKey, &nextRun, + ) + if err != nil { + t.Fatalf("EnqueueA2A returned error: %v", err) + } + if id != newID { + t.Errorf("expected id %q, got %q", newID, id) + } + if depth != 2 { + t.Errorf("expected depth 2, got %d", depth) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestEnqueueA2A_NoKey_SkipsSupersede: with no idempotency key there is no +// active-set conflict to guard, so the supersede cleanup is skipped entirely +// and only the insert + depth queries run. +func TestEnqueueA2A_NoKey_SkipsSupersede(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + // No expectSupersedeExpired — it must NOT be issued when key is empty. + const newID = "no-key-id" + expectInsert(mock, newID) + expectDepth(mock, enqWorkspaceID, 1) + + id, _, err := EnqueueA2A( + context.Background(), enqWorkspaceID, "", PriorityTask, + []byte(enqBody), enqMethod, "", nil, + ) + if err != nil { + t.Fatalf("EnqueueA2A returned error: %v", err) + } + if id != newID { + t.Errorf("expected id %q, got %q", newID, id) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 6bd70b2c2..a454dc958 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -56,10 +56,10 @@ func sanitizeUTF8(s string) string { // WorkspaceHandler.ProxyA2ARequest + WorkspaceHandler.EnqueueA2A satisfy this. type A2AProxy interface { ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) - // EnqueueA2A durably buffers an A2A message in the a2a_queue table for a - // busy workspace; the heartbeat drain dispatches it serially when the - // agent frees. idempotencyKey dedups active rows per (workspace,key). - // Returns the queue row id, the resulting queued depth, and any error. + // EnqueueA2A durably buffers an A2A message for a busy workspace; the + // drain dispatches it serially when the agent frees. idempotencyKey + // collapses duplicate pending buffers per (workspace,key). Returns the + // buffered entry id, the resulting pending depth, and any error. EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) } @@ -414,23 +414,19 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { // #969 → durable buffering. When the target workspace is busy // (active_tasks >= max_concurrent_tasks) we do NOT skip the tick and we do - // NOT block the scheduler goroutine waiting for capacity. Instead we ENQUEUE - // the cron message into the durable a2a_queue, mirroring how busy A2A - // dispatches already buffer (internal/handlers/a2a_queue.go). The heartbeat - // drain then dispatches it serially the moment the agent frees — execution - // stays one-at-a-time; max_concurrent_tasks is unchanged. + // NOT block the scheduler goroutine waiting for capacity. Instead we durably + // buffer the cron message, mirroring how busy A2A dispatches already buffer. + // The drain then dispatches it serially the moment the agent frees — + // execution stays one-at-a-time; max_concurrent_tasks is unchanged. // - // This supersedes the previous "poll 10s ×12 then recordSkipped" behavior, - // which dropped scheduled ticks on perpetually-busy workspaces (the - // Orchestrator pulse delegation chain kept leaders busy, ~30% cron drop). + // This supersedes the previous "poll then recordSkipped" behavior, which + // dropped scheduled ticks on workspaces that stayed busy across the whole + // poll window. // // Idempotency key = sched.ID (the SCHEDULE id), NOT msgID/a random uuid. - // The a2a_queue partial-unique index idx_a2a_queue_idempotency dedups on - // (workspace_id, idempotency_key) for status IN ('queued','dispatched'). // Keying by schedule_id means a busy agent buffers AT MOST ONE pending tick - // per schedule — the latest one wins (ON CONFLICT DO NOTHING keeps the - // already-queued row; the obsolete newer tick is dropped at the DB) — so we - // hold the next tick instead of stacking a stale backlog of one-tick-per-30s. + // per schedule — the latest one wins, the obsolete newer tick is collapsed — + // so we hold the next tick instead of stacking a stale backlog. if capErr == nil && activeTasks >= maxConcurrent { // Buffered ticks expire at the next scheduled fire: a tick that's been // sitting in the queue past when the cron would naturally tick again is @@ -761,14 +757,14 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active } // recordQueued advances next_run_at and logs a cron_run activity entry with -// status='queued' when the target workspace was busy and the tick was buffered -// into the durable a2a_queue instead of fired. Mirrors recordSkipped (#115) but -// records a buffer, not a drop: the heartbeat drain will dispatch qID serially -// when the agent frees. next_run_at still advances so the liveness view keeps -// ticking and the NEXT cron slot enqueues (the schedule_id idempotency key then -// holds at most one pending tick — the latest — per schedule). +// status='queued' when the target workspace was busy and the tick was durably +// buffered instead of fired. Mirrors recordSkipped (#115) but records a buffer, +// not a drop: the drain will dispatch qID serially when the agent frees. +// next_run_at still advances so the liveness view keeps ticking and the NEXT +// cron slot enqueues (the schedule_id idempotency key then holds at most one +// pending tick — the latest — per schedule). func (s *Scheduler) recordQueued(ctx context.Context, sched scheduleRow, activeTasks int, queueID string, depth int) { - reason := fmt.Sprintf("queued: workspace busy (active_tasks=%d), buffered to a2a_queue (id=%s, depth=%d)", activeTasks, short(queueID, 8), depth) + reason := fmt.Sprintf("queued: workspace busy (active_tasks=%d), buffered (id=%s, depth=%d)", activeTasks, short(queueID, 8), depth) nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()) var nextRunPtr *time.Time -- 2.52.0