molecule-core/workspace-server/internal/handlers/a2a_queue.go
rabbitblood 87a97846cd feat(a2a): queue-on-busy — Phase 1 of priority queue (#1870)
## Problem

When a lead delegates to a worker that's mid-synthesis, the proxy returns
503 "workspace agent busy" and the caller records the delegation as
failed. On fan-out storms from leads this hits ~70% drop rate — today's
observed numbers in the cycle reports.

## Fix — Phase 1 TASK-level queue-on-busy

When `handleA2ADispatchError` determines the target is busy, instead of
returning 503, enqueue the request as priority=TASK and return 202
Accepted with `{queued: true, queue_id, queue_depth}`. The workspace's
next heartbeat (≤30s) drains one item if it reports spare capacity.

Files:

  - migrations/042_a2a_queue.{up,down}.sql — `a2a_queue` table with
    partial indexes on status='queued' + idempotency_key. Schema
    supports PriorityCritical/Task/Info from day one so Phase 2/3 ship
    without migration churn.

  - internal/handlers/a2a_queue.go — EnqueueA2A / DequeueNext /
    Mark*-helpers plus WorkspaceHandler.DrainQueueForWorkspace. Uses
    `SELECT ... FOR UPDATE SKIP LOCKED` so concurrent drains can't
    double-claim the same row. Max 5 attempts before marking 'failed'
    so a stuck item doesn't wedge the queue forever.

  - internal/handlers/a2a_proxy_helpers.go — isUpstreamBusyError branch
    calls EnqueueA2A and returns 202 on success. Falls through to the
    legacy 503 on enqueue error (DB hiccup shouldn't silently drop).

  - internal/handlers/registry.go — RegistryHandler gets a QueueDrainFunc
    injection hook (SetQueueDrainFunc). When Heartbeat sees
    active_tasks < max_concurrent_tasks, spawns a goroutine that calls
    the drain hook. context.WithoutCancel ensures the drain outlives
    the heartbeat handler's ctx.

  - internal/router/router.go — wires wh.DrainQueueForWorkspace into
    rh.SetQueueDrainFunc after both are constructed.

## Not in this PR (Phase 2/3/4 follow-ups)

  - INFO priority + TTL (Phase 2)
  - CRITICAL priority + soft preemption between tool calls (Phase 3)
  - Age-based promotion so TASK doesn't starve (Phase 4)
  - `GET /workspaces/:id/queue` observability endpoint

Schema already supports all of these; only the dispatch + policy code
remains.

## Tests

  - TestExtractIdempotencyKey (5 cases): messageId parsing is robust
  - TestPriorityConstants: ordering invariant + 50=TASK default
    alignment with migration DEFAULT

Full DB-touching tests (FIFO order, retry bound, idempotency conflict)
intentionally deferred to the CI migration-enabled path — sqlmock
ceremony would duplicate the existing test infrastructure 3× over and
the behaviour is directly expressible in SQL constraints (FOR UPDATE
SKIP LOCKED, partial unique index).

## Expected impact once deployed

  - a2a_receive error with "busy" flavor drops from ~69/10min observed
    today to ~0
  - delegation_failed rate drops from ~50% to <5%
  - real_output metric rises from ~30/15min back toward the pre-
    throttle baseline

Closes #1870 Phase 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 14:09:29 -07:00

242 lines
7.8 KiB
Go

package handlers
// a2a_queue.go — #1870 Phase 1: enqueue A2A requests whose target is busy,
// drain the queue on heartbeat when the target regains capacity.
//
// Three levels are declared here so Phase 2/3 can land without a migration:
// - PriorityCritical = 100 — preempts running task (Phase 3, not active yet)
// - PriorityTask = 50 — default, FIFO within priority (Phase 1, active)
// - PriorityInfo = 10 — best-effort with TTL (Phase 2, not active yet)
//
// Phase 1 writes only PriorityTask. The `priority` column tolerates all three.
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// extractIdempotencyKey pulls params.message.messageId out of an A2A JSON-RPC
// body (normalizeA2APayload guarantees this field is set before dispatch).
// Empty string on parse failure — callers treat that as "no idempotency".
func extractIdempotencyKey(body []byte) string {
var envelope struct {
Params struct {
Message struct {
MessageID string `json:"messageId"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &envelope); err != nil {
return ""
}
return envelope.Params.Message.MessageID
}
const (
PriorityCritical = 100
PriorityTask = 50
PriorityInfo = 10
)
// QueuedItem is what the heartbeat drain path pulls off the queue.
type QueuedItem struct {
ID string
WorkspaceID string
CallerID sql.NullString
Priority int
Body []byte
Method sql.NullString
Attempts int
}
// EnqueueA2A inserts a busy-retry-eligible A2A request into a2a_queue and
// returns the new row ID + current queue depth. Caller MUST have already
// determined the target is busy — this function does not check.
//
// Idempotency: when idempotencyKey is non-empty, the partial unique index
// `idx_a2a_queue_idempotency` prevents duplicate active rows for the same
// (workspace_id, idempotency_key). On conflict this returns the existing
// row's ID so the caller's log still points at the live queue entry.
func EnqueueA2A(
ctx context.Context,
workspaceID, callerID string,
priority int,
body []byte,
method, idempotencyKey string,
) (id string, depth int, err error) {
var keyArg interface{}
if idempotencyKey != "" {
keyArg = idempotencyKey
}
var callerArg interface{}
if callerID != "" {
callerArg = callerID
}
var methodArg interface{}
if method != "" {
methodArg = method
}
// INSERT ... ON CONFLICT DO NOTHING RETURNING id. On conflict we then
// look up the existing row's id so the caller always receives a valid
// queue entry reference.
err = db.DB.QueryRowContext(ctx, `
INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key)
VALUES ($1, $2, $3, $4::jsonb, $5, $6)
ON CONFLICT ON CONSTRAINT idx_a2a_queue_idempotency DO NOTHING
RETURNING id
`, workspaceID, callerArg, priority, string(body), methodArg, keyArg).Scan(&id)
if errors.Is(err, sql.ErrNoRows) && idempotencyKey != "" {
// Conflict — look up the existing active row and use its id.
err = db.DB.QueryRowContext(ctx, `
SELECT id FROM a2a_queue
WHERE workspace_id = $1 AND idempotency_key = $2
AND status IN ('queued','dispatched')
LIMIT 1
`, workspaceID, idempotencyKey).Scan(&id)
if err != nil {
return "", 0, err
}
} else if err != nil {
return "", 0, err
}
// Return current queue depth for the caller's visibility.
_ = db.DB.QueryRowContext(ctx, `
SELECT COUNT(*) FROM a2a_queue
WHERE workspace_id = $1 AND status = 'queued'
`, workspaceID).Scan(&depth)
log.Printf("A2AQueue: enqueued %s for workspace %s (priority=%d, depth=%d)", id, workspaceID, priority, depth)
return id, depth, nil
}
// DequeueNext claims the next queued item for a workspace and marks it
// 'dispatched'. Uses SELECT ... FOR UPDATE SKIP LOCKED so two concurrent
// drain calls don't both claim the same row.
//
// Returns (nil, nil) when the queue is empty — not an error.
func DequeueNext(ctx context.Context, workspaceID string) (*QueuedItem, error) {
tx, err := db.DB.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer func() { _ = tx.Rollback() }()
var item QueuedItem
var body string
err = tx.QueryRowContext(ctx, `
SELECT id, workspace_id, caller_id, priority, body::text, method, attempts
FROM a2a_queue
WHERE workspace_id = $1 AND status = 'queued'
AND (expires_at IS NULL OR expires_at > now())
ORDER BY priority DESC, enqueued_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
`, workspaceID).Scan(
&item.ID, &item.WorkspaceID, &item.CallerID, &item.Priority,
&body, &item.Method, &item.Attempts,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
item.Body = []byte(body)
if _, err := tx.ExecContext(ctx, `
UPDATE a2a_queue
SET status = 'dispatched', dispatched_at = now(), attempts = attempts + 1
WHERE id = $1
`, item.ID); err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return &item, nil
}
// MarkQueueItemCompleted flips the queue row to 'completed' on a successful
// drain dispatch.
func MarkQueueItemCompleted(ctx context.Context, id string) {
if _, err := db.DB.ExecContext(ctx,
`UPDATE a2a_queue SET status = 'completed', completed_at = now() WHERE id = $1`, id,
); err != nil {
log.Printf("A2AQueue: failed to mark %s completed: %v", id, err)
}
}
// MarkQueueItemFailed returns a dispatched item back to 'queued' with an
// incremented attempts counter so the next drain tick picks it up. Hits
// an upper bound (5 attempts) to avoid wedging a stuck item in the queue
// forever.
func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
const maxAttempts = 5
if _, err := db.DB.ExecContext(ctx, `
UPDATE a2a_queue
SET status = CASE WHEN attempts >= $2 THEN 'failed' ELSE 'queued' END,
last_error = $3,
dispatched_at = NULL
WHERE id = $1
`, id, maxAttempts, errMsg); err != nil {
log.Printf("A2AQueue: failed to mark %s failed: %v", id, err)
}
}
// QueueDepth returns the number of currently-queued (not dispatched/completed)
// items for a workspace. Used by the busy-return response body so callers
// can see how many ahead of them.
func QueueDepth(ctx context.Context, workspaceID string) int {
var n int
_ = db.DB.QueryRowContext(ctx,
`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`,
workspaceID,
).Scan(&n)
return n
}
// DrainQueueForWorkspace pulls one queued item and dispatches it via the
// same ProxyA2ARequest path a live caller would use. Idempotent and
// concurrency-safe — multiple concurrent calls for the same workspace are
// each claim-guarded by SELECT ... FOR UPDATE SKIP LOCKED in DequeueNext.
//
// Called from the Heartbeat handler's goroutine when the workspace reports
// spare capacity. Errors here are logged but not returned — the caller is
// a fire-and-forget goroutine.
func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspaceID string) {
item, err := DequeueNext(ctx, workspaceID)
if err != nil {
log.Printf("A2AQueue drain: dequeue failed for %s: %v", workspaceID, err)
return
}
if item == nil {
return // queue empty, no work
}
callerID := ""
if item.CallerID.Valid {
callerID = item.CallerID.String
}
// logActivity=false: the original EnqueueA2A callsite already logged
// the dispatch attempt; re-logging here would double-count events.
_, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
if proxyErr != nil {
MarkQueueItemFailed(ctx, item.ID, proxyErr.Response["error"].(string))
log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %v",
item.ID, item.Attempts, proxyErr.Response["error"])
return
}
MarkQueueItemCompleted(ctx, item.ID)
log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)",
item.ID, workspaceID, item.Attempts)
}