fix(scheduler): enqueue cron ticks on busy agents instead of dropping them #2446
@@ -225,6 +225,16 @@ func (e *proxyA2AError) Error() string {
|
||||
return "proxy a2a error"
|
||||
}
|
||||
|
||||
// EnqueueA2A is a method wrapper around the package-level EnqueueA2A function so
|
||||
// that *WorkspaceHandler satisfies the scheduler's A2AProxy interface. The
|
||||
// scheduler cannot call the package function directly (it would have to import
|
||||
// internal/handlers, but handlers already imports internal/scheduler → import
|
||||
// cycle), so it goes through this method on the proxy it already holds. Used by
|
||||
// the cron scheduler to durably buffer a tick when the target workspace is busy.
|
||||
func (h *WorkspaceHandler) EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) {
|
||||
return EnqueueA2A(ctx, workspaceID, callerID, priority, body, method, idempotencyKey, expiresAt)
|
||||
}
|
||||
|
||||
// ProxyA2ARequest is the public wrapper for proxyA2ARequest, used by the
|
||||
// cron scheduler and other internal callers that need to send A2A messages
|
||||
// to workspaces programmatically (not from an HTTP handler).
|
||||
|
||||
@@ -97,10 +97,10 @@ type QueuedItem struct {
|
||||
// returns the new row ID + current queue depth. Caller MUST have already
|
||||
// determined the target is busy — this function does not check.
|
||||
//
|
||||
// Idempotency: when idempotencyKey is non-empty, the partial unique index
|
||||
// `idx_a2a_queue_idempotency` prevents duplicate active rows for the same
|
||||
// (workspace_id, idempotency_key). On conflict this returns the existing
|
||||
// row's ID so the caller's log still points at the live queue entry.
|
||||
// Idempotency: when idempotencyKey is non-empty, a duplicate active enqueue
|
||||
// for the same (workspace, key) is collapsed rather than double-buffered. On
|
||||
// a duplicate this returns the existing row's ID so the caller's log still
|
||||
// points at the live queue entry.
|
||||
func EnqueueA2A(
|
||||
ctx context.Context,
|
||||
workspaceID, callerID string,
|
||||
@@ -129,6 +129,32 @@ func EnqueueA2A(
|
||||
expiresAtArg = *expiresAt
|
||||
}
|
||||
|
||||
// Supersede any already-expired pending row for this same key before we
|
||||
// insert. The drain path skips expired pending rows, so such a row never
|
||||
// completes on its own — it lingers in the active set and would block the
|
||||
// conflict check below, silently swallowing this fresh enqueue. Retiring
|
||||
// it here (a) frees the active set so the insert below proceeds and (b)
|
||||
// cleans the stale row up so expired rows don't accumulate. Scoped to the
|
||||
// idempotency key so unrelated traffic is untouched.
|
||||
if idempotencyKey != "" {
|
||||
if _, supErr := db.DB.ExecContext(ctx, `
|
||||
UPDATE a2a_queue
|
||||
SET status = 'dropped',
|
||||
last_error = 'superseded: expired before drain; replaced by a fresh enqueue'
|
||||
WHERE workspace_id = $1
|
||||
AND idempotency_key = $2
|
||||
AND status = 'queued'
|
||||
AND expires_at IS NOT NULL
|
||||
AND expires_at <= now()
|
||||
`, workspaceID, idempotencyKey); supErr != nil {
|
||||
// Non-fatal: if the cleanup fails we still attempt the insert. Worst
|
||||
// case the conflict path returns the (stale) existing row's id, which
|
||||
// is the pre-fix behaviour — no new breakage introduced here.
|
||||
log.Printf("A2AQueue: supersede-expired cleanup failed for workspace %s key %s: %v",
|
||||
workspaceID, idempotencyKey, supErr)
|
||||
}
|
||||
}
|
||||
|
||||
// INSERT ... ON CONFLICT DO NOTHING RETURNING id. The conflict target
|
||||
// must reference the partial unique INDEX columns + WHERE clause directly
|
||||
// (Postgres can't reference partial unique indexes by name in
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
package handlers
|
||||
|
||||
// a2a_queue_enqueue_expired_test.go — regression for CR3 RC 9853.
|
||||
//
|
||||
// Bug: a pending buffered tick that expires before the drain reaches it is
|
||||
// skipped by the drain (it filters out expired pending rows) yet still occupies
|
||||
// the active set the idempotency check guards. A later tick for the SAME key
|
||||
// would then collapse onto that dead row and be silently swallowed — the exact
|
||||
// drop the busy-buffer path was built to prevent.
|
||||
//
|
||||
// Fix: EnqueueA2A retires any already-expired pending row for the key BEFORE the
|
||||
// insert, so the fresh tick buffers (and the stale row is cleaned up) instead of
|
||||
// being dropped.
|
||||
//
|
||||
// These tests use the QueryMatcherEqual mock (setupTestDBForQueueTests) so the
|
||||
// SQL strings below must match the handler's queries verbatim.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
const (
|
||||
enqWorkspaceID = "ws-enq-expired"
|
||||
enqKey = "sched-aaaa-bbbb" // schedule_id used as idempotency key
|
||||
enqBody = `{"method":"message/send"}`
|
||||
enqMethod = "message/send"
|
||||
)
|
||||
|
||||
// expectSupersedeExpired registers the cleanup UPDATE EnqueueA2A issues before
|
||||
// the insert when an idempotency key is present. rowsRetired is how many expired
|
||||
// pending rows the UPDATE claims to have dropped.
|
||||
func expectSupersedeExpired(mock sqlmock.Sqlmock, workspaceID, key string, rowsRetired int64) {
|
||||
mock.ExpectExec(`
|
||||
UPDATE a2a_queue
|
||||
SET status = 'dropped',
|
||||
last_error = 'superseded: expired before drain; replaced by a fresh enqueue'
|
||||
WHERE workspace_id = $1
|
||||
AND idempotency_key = $2
|
||||
AND status = 'queued'
|
||||
AND expires_at IS NOT NULL
|
||||
AND expires_at <= now()
|
||||
`).
|
||||
WithArgs(workspaceID, key).
|
||||
WillReturnResult(sqlmock.NewResult(0, rowsRetired))
|
||||
}
|
||||
|
||||
// expectInsert registers the INSERT ... ON CONFLICT DO NOTHING RETURNING id.
|
||||
// newID is the id the insert returns (non-conflict / fresh enqueue path).
|
||||
func expectInsert(mock sqlmock.Sqlmock, newID string) {
|
||||
mock.ExpectQuery(`
|
||||
INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key, expires_at)
|
||||
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
|
||||
ON CONFLICT (workspace_id, idempotency_key)
|
||||
WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched')
|
||||
DO NOTHING
|
||||
RETURNING id
|
||||
`).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(newID))
|
||||
}
|
||||
|
||||
// expectDepth registers the trailing queue-depth count query.
|
||||
func expectDepth(mock sqlmock.Sqlmock, workspaceID string, depth int) {
|
||||
mock.ExpectQuery(`
|
||||
SELECT COUNT(*) FROM a2a_queue
|
||||
WHERE workspace_id = $1 AND status = 'queued'
|
||||
`).WithArgs(workspaceID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(depth))
|
||||
}
|
||||
|
||||
// TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick is the core CR3 regression:
|
||||
// an existing expired pending row for a schedule's key must NOT cause the next
|
||||
// tick's enqueue to be dropped. The expired row is retired first, then the
|
||||
// fresh tick inserts and returns a NEW id.
|
||||
func TestEnqueueA2A_ExpiredRowDoesNotBlockFreshTick(t *testing.T) {
|
||||
mock := setupTestDBForQueueTests(t)
|
||||
|
||||
// One expired pending row exists for this key and gets retired.
|
||||
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 1)
|
||||
// With the active set cleared, the insert proceeds (no conflict) → new id.
|
||||
const freshID = "fresh-tick-id"
|
||||
expectInsert(mock, freshID)
|
||||
expectDepth(mock, enqWorkspaceID, 1)
|
||||
|
||||
nextRun := time.Now().Add(30 * time.Second)
|
||||
id, depth, err := EnqueueA2A(
|
||||
context.Background(), enqWorkspaceID, "", PriorityTask,
|
||||
[]byte(enqBody), enqMethod, enqKey, &nextRun,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("EnqueueA2A returned error: %v", err)
|
||||
}
|
||||
if id != freshID {
|
||||
t.Errorf("expected the fresh tick to enqueue with a new id %q, got %q "+
|
||||
"(an expired row must not swallow the new tick)", freshID, id)
|
||||
}
|
||||
if depth != 1 {
|
||||
t.Errorf("expected depth 1, got %d", depth)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueA2A_NoExpiredRow_NormalEnqueue: when no expired row exists the
|
||||
// supersede UPDATE simply affects zero rows and the enqueue proceeds normally.
|
||||
func TestEnqueueA2A_NoExpiredRow_NormalEnqueue(t *testing.T) {
|
||||
mock := setupTestDBForQueueTests(t)
|
||||
|
||||
expectSupersedeExpired(mock, enqWorkspaceID, enqKey, 0) // nothing to retire
|
||||
const newID = "new-id"
|
||||
expectInsert(mock, newID)
|
||||
expectDepth(mock, enqWorkspaceID, 2)
|
||||
|
||||
nextRun := time.Now().Add(30 * time.Second)
|
||||
id, depth, err := EnqueueA2A(
|
||||
context.Background(), enqWorkspaceID, "", PriorityTask,
|
||||
[]byte(enqBody), enqMethod, enqKey, &nextRun,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("EnqueueA2A returned error: %v", err)
|
||||
}
|
||||
if id != newID {
|
||||
t.Errorf("expected id %q, got %q", newID, id)
|
||||
}
|
||||
if depth != 2 {
|
||||
t.Errorf("expected depth 2, got %d", depth)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueA2A_NoKey_SkipsSupersede: with no idempotency key there is no
|
||||
// active-set conflict to guard, so the supersede cleanup is skipped entirely
|
||||
// and only the insert + depth queries run.
|
||||
func TestEnqueueA2A_NoKey_SkipsSupersede(t *testing.T) {
|
||||
mock := setupTestDBForQueueTests(t)
|
||||
|
||||
// No expectSupersedeExpired — it must NOT be issued when key is empty.
|
||||
const newID = "no-key-id"
|
||||
expectInsert(mock, newID)
|
||||
expectDepth(mock, enqWorkspaceID, 1)
|
||||
|
||||
id, _, err := EnqueueA2A(
|
||||
context.Background(), enqWorkspaceID, "", PriorityTask,
|
||||
[]byte(enqBody), enqMethod, "", nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("EnqueueA2A returned error: %v", err)
|
||||
}
|
||||
if id != newID {
|
||||
t.Errorf("expected id %q, got %q", newID, id)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,11 @@ const (
|
||||
// fireSchedule goroutine indefinitely, which blocked wg.Wait() in
|
||||
// tick(), which stalled the entire scheduler until operator restart.
|
||||
dbQueryTimeout = 10 * time.Second
|
||||
// priorityTask mirrors handlers.PriorityTask (50) — the default FIFO A2A
|
||||
// queue priority. Duplicated as a local const because the scheduler cannot
|
||||
// import internal/handlers (handlers imports scheduler → cycle). Buffered
|
||||
// cron ticks enqueue at the same priority as normal busy-retry A2A work.
|
||||
priorityTask = 50
|
||||
)
|
||||
|
||||
// sanitizeUTF8 replaces invalid UTF-8 byte sequences with the Unicode
|
||||
@@ -48,9 +53,14 @@ func sanitizeUTF8(s string) string {
|
||||
}
|
||||
|
||||
// A2AProxy is the interface the scheduler needs to send messages to workspaces.
|
||||
// WorkspaceHandler.ProxyA2ARequest satisfies this.
|
||||
// WorkspaceHandler.ProxyA2ARequest + WorkspaceHandler.EnqueueA2A satisfy this.
|
||||
type A2AProxy interface {
|
||||
ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error)
|
||||
// EnqueueA2A durably buffers an A2A message for a busy workspace; the
|
||||
// drain dispatches it serially when the agent frees. idempotencyKey
|
||||
// collapses duplicate pending buffers per (workspace,key). Returns the
|
||||
// buffered entry id, the resulting pending depth, and any error.
|
||||
EnqueueA2A(ctx context.Context, workspaceID, callerID string, priority int, body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error)
|
||||
}
|
||||
|
||||
// Broadcaster records events and pushes them to WebSocket clients.
|
||||
@@ -367,33 +377,6 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
sched.WorkspaceID,
|
||||
).Scan(&activeTasks, &maxConcurrent)
|
||||
capCancel()
|
||||
if capErr == nil && activeTasks >= maxConcurrent {
|
||||
log.Printf("Scheduler: '%s' workspace %s at capacity (active_tasks=%d, max=%d), deferring up to 2 min",
|
||||
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent)
|
||||
// Poll every 10s for up to 2 minutes
|
||||
waited := false
|
||||
for i := 0; i < 12; i++ {
|
||||
time.Sleep(10 * time.Second)
|
||||
pollCtx, pollCancel := context.WithTimeout(ctx, dbQueryTimeout)
|
||||
err := db.DB.QueryRowContext(pollCtx,
|
||||
`SELECT COALESCE(active_tasks, 0), COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`,
|
||||
sched.WorkspaceID,
|
||||
).Scan(&activeTasks, &maxConcurrent)
|
||||
pollCancel()
|
||||
if err != nil || activeTasks < maxConcurrent {
|
||||
waited = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !waited && activeTasks >= maxConcurrent {
|
||||
log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d, max=%d)",
|
||||
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent)
|
||||
s.recordSkipped(ctx, sched, activeTasks)
|
||||
return
|
||||
}
|
||||
log.Printf("Scheduler: '%s' workspace %s has capacity after deferral, firing",
|
||||
sched.Name, short(sched.WorkspaceID, 12))
|
||||
}
|
||||
|
||||
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
|
||||
defer cancel()
|
||||
@@ -402,6 +385,9 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
// The agent sees recent peer messages before acting, enabling cross-agent
|
||||
// awareness without explicit A2A delegation. Best-effort — if the fetch
|
||||
// fails or the workspace has no Slack channels, the prompt is unchanged.
|
||||
//
|
||||
// Built BEFORE the capacity check so the busy-enqueue path below buffers
|
||||
// the exact same A2A message the fire path would have dispatched.
|
||||
prompt := sched.Prompt
|
||||
if s.channels != nil {
|
||||
if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" {
|
||||
@@ -426,6 +412,49 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
return
|
||||
}
|
||||
|
||||
// #969 → durable buffering. When the target workspace is busy
|
||||
// (active_tasks >= max_concurrent_tasks) we do NOT skip the tick and we do
|
||||
// NOT block the scheduler goroutine waiting for capacity. Instead we durably
|
||||
// buffer the cron message, mirroring how busy A2A dispatches already buffer.
|
||||
// The drain then dispatches it serially the moment the agent frees —
|
||||
// execution stays one-at-a-time; max_concurrent_tasks is unchanged.
|
||||
//
|
||||
// This supersedes the previous "poll then recordSkipped" behavior, which
|
||||
// dropped scheduled ticks on workspaces that stayed busy across the whole
|
||||
// poll window.
|
||||
//
|
||||
// Idempotency key = sched.ID (the SCHEDULE id), NOT msgID/a random uuid.
|
||||
// Keying by schedule_id means a busy agent buffers AT MOST ONE pending tick
|
||||
// per schedule — the latest one wins, the obsolete newer tick is collapsed —
|
||||
// so we hold the next tick instead of stacking a stale backlog.
|
||||
if capErr == nil && activeTasks >= maxConcurrent {
|
||||
// Buffered ticks expire at the next scheduled fire: a tick that's been
|
||||
// sitting in the queue past when the cron would naturally tick again is
|
||||
// stale, so let it expire rather than fire late. Best-effort — on a bad
|
||||
// cron expr we enqueue with no TTL (NULL) rather than block the tick.
|
||||
var expiresAt *time.Time
|
||||
if nextRun, nrErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); nrErr == nil {
|
||||
expiresAt = &nextRun
|
||||
}
|
||||
enqCtx, enqCancel := context.WithTimeout(ctx, dbQueryTimeout)
|
||||
// Empty callerID = canvas-style (source_id NULL), matching the fire path.
|
||||
qID, depth, enqErr := s.proxy.EnqueueA2A(enqCtx, sched.WorkspaceID, "", priorityTask, a2aBody, "message/send", sched.ID, expiresAt)
|
||||
enqCancel()
|
||||
if enqErr != nil {
|
||||
// Enqueue failed — fall back to recording a skip so the liveness
|
||||
// view still advances and the operator sees the error, rather than
|
||||
// silently dropping the tick or firing into a busy agent.
|
||||
log.Printf("Scheduler: '%s' enqueue on busy workspace %s failed, recording skip: %v",
|
||||
sched.Name, short(sched.WorkspaceID, 12), enqErr)
|
||||
s.recordSkipped(ctx, sched, activeTasks)
|
||||
return
|
||||
}
|
||||
log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d, max=%d) — enqueued tick %s (queue depth=%d), will drain when idle",
|
||||
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent, short(qID, 8), depth)
|
||||
s.recordQueued(ctx, sched, activeTasks, qID, depth)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
|
||||
|
||||
// Empty callerID = canvas-style request (bypasses access control, source_id=NULL in activity log).
|
||||
@@ -727,6 +756,74 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
}
|
||||
}
|
||||
|
||||
// recordQueued advances next_run_at and logs a cron_run activity entry with
|
||||
// status='queued' when the target workspace was busy and the tick was durably
|
||||
// buffered instead of fired. Mirrors recordSkipped (#115) but records a buffer,
|
||||
// not a drop: the drain will dispatch qID serially when the agent frees.
|
||||
// next_run_at still advances so the liveness view keeps ticking and the NEXT
|
||||
// cron slot enqueues (the schedule_id idempotency key then holds at most one
|
||||
// pending tick — the latest — per schedule).
|
||||
func (s *Scheduler) recordQueued(ctx context.Context, sched scheduleRow, activeTasks int, queueID string, depth int) {
|
||||
reason := fmt.Sprintf("queued: workspace busy (active_tasks=%d), buffered (id=%s, depth=%d)", activeTasks, short(queueID, 8), depth)
|
||||
|
||||
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
} else {
|
||||
// Same guard as recordSkipped/fireSchedule — preserve existing
|
||||
// next_run_at rather than writing NULL on an unparseable cron expr.
|
||||
log.Printf("Scheduler: ComputeNextRun error in recordQueued for '%s' (%s) — preserving existing next_run_at: %v",
|
||||
sched.Name, sched.ID, nextErr)
|
||||
}
|
||||
|
||||
queuedUpdCtx, queuedUpdCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
if _, err := db.DB.ExecContext(queuedUpdCtx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = COALESCE($2, next_run_at),
|
||||
run_count = run_count + 1,
|
||||
last_status = 'queued',
|
||||
last_error = $3,
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
`, sched.ID, nextRunPtr, sanitizeUTF8(reason)); err != nil {
|
||||
log.Printf("Scheduler: '%s' queued update failed: %v", sched.Name, err)
|
||||
}
|
||||
queuedUpdCancel()
|
||||
|
||||
cronMeta, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"cron_expr": sched.CronExpr,
|
||||
"queued": true,
|
||||
"active_tasks": activeTasks,
|
||||
"queue_id": queueID,
|
||||
"queue_depth": depth,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("Scheduler '%s': json.Marshal cronMeta(queued) failed: %v", sched.Name, marshalErr)
|
||||
} else {
|
||||
queuedInsCtx, queuedInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
|
||||
if _, err := db.DB.ExecContext(queuedInsCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'queued', $4, now())
|
||||
`, sched.WorkspaceID, sanitizeUTF8("Cron queued (busy): "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
|
||||
log.Printf("Scheduler: '%s' queued activity log failed: %v", sched.Name, err)
|
||||
}
|
||||
queuedInsCancel()
|
||||
}
|
||||
|
||||
if s.broadcaster != nil {
|
||||
_ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"reason": reason,
|
||||
"queued": true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// repairNullNextRunAt is called once during Start() to recompute next_run_at
|
||||
// for any enabled schedule where it is NULL — a state left by the pre-#722 bug
|
||||
// where a ComputeNextRun error caused an UPDATE that wrote NULL.
|
||||
|
||||
@@ -73,6 +73,14 @@ type recordingProxy struct {
|
||||
lastCaller string
|
||||
lastLogFlag bool
|
||||
lastWSID string
|
||||
|
||||
// enqueue tracking — the busy path calls EnqueueA2A instead of firing.
|
||||
enqueues int
|
||||
lastEnqBody []byte
|
||||
lastEnqKey string
|
||||
enqQueueID string
|
||||
enqDepth int
|
||||
enqErr error
|
||||
}
|
||||
|
||||
func (p *recordingProxy) ProxyA2ARequest(
|
||||
@@ -89,6 +97,25 @@ func (p *recordingProxy) ProxyA2ARequest(
|
||||
return p.status, p.body, nil
|
||||
}
|
||||
|
||||
// EnqueueA2A records the busy-path enqueue so tests can assert that a tick on a
|
||||
// busy workspace was buffered (not fired, not skipped).
|
||||
func (p *recordingProxy) EnqueueA2A(
|
||||
_ context.Context, workspaceID, callerID string, _ int, body []byte, _ string, idempotencyKey string, _ *time.Time,
|
||||
) (string, int, error) {
|
||||
p.enqueues++
|
||||
p.lastWSID = workspaceID
|
||||
p.lastCaller = callerID
|
||||
p.lastEnqBody = body
|
||||
p.lastEnqKey = idempotencyKey
|
||||
if p.enqErr != nil {
|
||||
return "", 0, p.enqErr
|
||||
}
|
||||
if p.enqQueueID == "" {
|
||||
p.enqQueueID = "q-rec-1"
|
||||
}
|
||||
return p.enqQueueID, p.enqDepth, nil
|
||||
}
|
||||
|
||||
// ── connection + fixture helpers ──────────────────────────────────────────
|
||||
|
||||
// integrationDB returns the configured integration-test connection or skips
|
||||
|
||||
@@ -42,6 +42,13 @@ func (p *panicProxy) ProxyA2ARequest(
|
||||
panic("simulated A2A proxy panic")
|
||||
}
|
||||
|
||||
// EnqueueA2A satisfies the extended A2AProxy interface; panics like the fire path.
|
||||
func (p *panicProxy) EnqueueA2A(
|
||||
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
|
||||
) (string, int, error) {
|
||||
panic("simulated A2A enqueue panic")
|
||||
}
|
||||
|
||||
// ── TestLastTickAt_zero ───────────────────────────────────────────────────────
|
||||
|
||||
// TestLastTickAt_zero confirms that LastTickAt returns a zero time.Time on a
|
||||
@@ -210,6 +217,90 @@ func TestShort_helper(t *testing.T) {
|
||||
}
|
||||
|
||||
// ── TestRecordSkipped_writesSkippedStatus ────────────────────────────────────
|
||||
// ── busyEnqueueProxy + TestFireSchedule_BusyEnqueuesInsteadOfSkipping ──────────
|
||||
//
|
||||
// Replaces the old "busy → skip after 2 min" assertion. When the workspace is
|
||||
// at capacity, fireSchedule must ENQUEUE the tick into the durable a2a_queue
|
||||
// (keyed by schedule_id) and record last_status='queued' — NOT fire and NOT
|
||||
// recordSkipped. Proves the scheduled-tick-starvation fix.
|
||||
|
||||
type busyEnqueueProxy struct {
|
||||
fired int
|
||||
enqueued int
|
||||
enqKey string
|
||||
enqMethod string
|
||||
enqPriority int
|
||||
}
|
||||
|
||||
func (p *busyEnqueueProxy) ProxyA2ARequest(
|
||||
_ context.Context, _ string, _ []byte, _ string, _ bool,
|
||||
) (int, []byte, error) {
|
||||
p.fired++
|
||||
return 200, []byte(`{"ok":true}`), nil
|
||||
}
|
||||
|
||||
func (p *busyEnqueueProxy) EnqueueA2A(
|
||||
_ context.Context, _ string, _ string, priority int, _ []byte, method, idempotencyKey string, _ *time.Time,
|
||||
) (string, int, error) {
|
||||
p.enqueued++
|
||||
p.enqKey = idempotencyKey
|
||||
p.enqMethod = method
|
||||
p.enqPriority = priority
|
||||
return "q-busy-1", 1, nil
|
||||
}
|
||||
|
||||
func TestFireSchedule_BusyEnqueuesInsteadOfSkipping(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
sched := scheduleRow{
|
||||
ID: "77777777-dead-beef-0000-000000000007",
|
||||
WorkspaceID: "88888888-dead-beef-0000-000000000008",
|
||||
Name: "busy-enqueue-job",
|
||||
CronExpr: "*/5 * * * *",
|
||||
Timezone: "UTC",
|
||||
Prompt: "tick while busy",
|
||||
}
|
||||
|
||||
// Capacity check → active_tasks(2) >= max_concurrent(1): workspace is busy.
|
||||
mock.ExpectQuery(`SELECT COALESCE`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"active_tasks", "max"}).AddRow(2, 1))
|
||||
|
||||
// recordQueued UPDATE — binds ($1=sched.ID, $2=nextRunPtr, $3=reason);
|
||||
// last_status='queued' is a SQL literal, not a bound arg.
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// recordQueued activity_logs INSERT — binds 4 args (workspace_id, summary,
|
||||
// request_body, error_detail); status='queued' is a SQL literal.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
proxy := &busyEnqueueProxy{}
|
||||
s := New(proxy, nil)
|
||||
s.fireSchedule(context.Background(), sched)
|
||||
|
||||
if proxy.fired != 0 {
|
||||
t.Errorf("busy workspace: ProxyA2ARequest must NOT fire, got %d fires", proxy.fired)
|
||||
}
|
||||
if proxy.enqueued != 1 {
|
||||
t.Fatalf("busy workspace: expected exactly 1 EnqueueA2A, got %d", proxy.enqueued)
|
||||
}
|
||||
if proxy.enqKey != sched.ID {
|
||||
t.Errorf("idempotency key must be schedule_id %q (buffer-latest dedup), got %q", sched.ID, proxy.enqKey)
|
||||
}
|
||||
if proxy.enqMethod != "message/send" {
|
||||
t.Errorf("enqueued method = %q, want \"message/send\"", proxy.enqMethod)
|
||||
}
|
||||
if proxy.enqPriority != priorityTask {
|
||||
t.Errorf("enqueued priority = %d, want priorityTask(%d)", proxy.enqPriority, priorityTask)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations — busy tick not recorded as queued: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// #115 coverage gap: the recordSkipped path wasn't tested at all when it
|
||||
// first landed. Exercises the UPDATE workspace_schedules + INSERT into
|
||||
// activity_logs via sqlmock. Broadcaster is nil so we don't need to stub
|
||||
@@ -257,6 +348,13 @@ func (p *successProxy) ProxyA2ARequest(
|
||||
return 200, []byte(`{"ok":true}`), nil
|
||||
}
|
||||
|
||||
// EnqueueA2A satisfies the extended A2AProxy interface.
|
||||
func (p *successProxy) EnqueueA2A(
|
||||
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
|
||||
) (string, int, error) {
|
||||
return "q-success", 1, nil
|
||||
}
|
||||
|
||||
// ── adapterErrorProxy ─────────────────────────────────────────────────────────
|
||||
|
||||
// adapterErrorProxy is a test double whose ProxyA2ARequest returns HTTP 200
|
||||
@@ -270,6 +368,13 @@ func (p *adapterErrorProxy) ProxyA2ARequest(
|
||||
return 200, []byte(`{"jsonrpc":"2.0","id":"cron-test-123","error":{"code":-32603,"message":"adapter SDK internal error"}}`), nil
|
||||
}
|
||||
|
||||
// EnqueueA2A satisfies the extended A2AProxy interface.
|
||||
func (p *adapterErrorProxy) EnqueueA2A(
|
||||
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
|
||||
) (string, int, error) {
|
||||
return "q-adaptererr", 1, nil
|
||||
}
|
||||
|
||||
// ── TestFireSchedule_AdapterSDKError (#1696) ──────────────────────────────────
|
||||
//
|
||||
// When the adapter SDK throws internally and returns HTTP 200 with an error
|
||||
@@ -667,6 +772,7 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) {
|
||||
"recordSkipped must advance next_run_at when workspace is busy (#1029)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// trigger CI
|
||||
|
||||
// ── TestDetectResultKind ───────────────────────────────────────────────────────
|
||||
@@ -833,10 +939,10 @@ func TestDetectResultKind(t *testing.T) {
|
||||
//
|
||||
// When ProxyA2ARequest returns HTTP 200 but the response body contains a
|
||||
// non-ok result_kind, fireSchedule must:
|
||||
// 1. Set last_status to the result_kind (not 'ok').
|
||||
// 2. Set last_error to describe the SDK error.
|
||||
// 3. Increment consecutive_sdk_errors.
|
||||
// 4. NOT auto-disable on first occurrence (threshold is 3).
|
||||
// 1. Set last_status to the result_kind (not 'ok').
|
||||
// 2. Set last_error to describe the SDK error.
|
||||
// 3. Increment consecutive_sdk_errors.
|
||||
// 4. NOT auto-disable on first occurrence (threshold is 3).
|
||||
//
|
||||
// This test uses an sdkErrorProxy that returns a rate-limited body and asserts
|
||||
// the first run is recorded as 'rate_limited' with consecutive_sdk_errors=1
|
||||
@@ -999,6 +1105,13 @@ func (p *sdkErrorProxy) ProxyA2ARequest(
|
||||
return 200, body, nil
|
||||
}
|
||||
|
||||
// EnqueueA2A satisfies the extended A2AProxy interface.
|
||||
func (p *sdkErrorProxy) EnqueueA2A(
|
||||
_ context.Context, _ string, _ string, _ int, _ []byte, _ string, _ string, _ *time.Time,
|
||||
) (string, int, error) {
|
||||
return "q-sdkerr", 1, nil
|
||||
}
|
||||
|
||||
// ── TestTruncate_utf8Safe_regression2026 ──────────────────────────────────────
|
||||
|
||||
// TestTruncate_utf8Safe_regression2026 locks in the #2026 fix: truncate must
|
||||
|
||||
Reference in New Issue
Block a user