forked from molecule-ai/molecule-core
## Problem
When a lead delegates to a worker that's mid-synthesis, the proxy returns
503 "workspace agent busy" and the caller records the delegation as
failed. On fan-out storms from leads this hits ~70% drop rate — today's
observed numbers in the cycle reports.
## Fix — Phase 1 TASK-level queue-on-busy
When `handleA2ADispatchError` determines the target is busy, instead of
returning 503, enqueue the request as priority=TASK and return 202
Accepted with `{queued: true, queue_id, queue_depth}`. The workspace's
next heartbeat (≤30s) drains one item if it reports spare capacity.
Files:
- migrations/042_a2a_queue.{up,down}.sql — `a2a_queue` table with
partial indexes on status='queued' + idempotency_key. Schema
supports PriorityCritical/Task/Info from day one so Phase 2/3 ship
without migration churn.
- internal/handlers/a2a_queue.go — EnqueueA2A / DequeueNext /
Mark*-helpers plus WorkspaceHandler.DrainQueueForWorkspace. Uses
`SELECT ... FOR UPDATE SKIP LOCKED` so concurrent drains can't
double-claim the same row. Max 5 attempts before marking 'failed'
so a stuck item doesn't wedge the queue forever.
- internal/handlers/a2a_proxy_helpers.go — isUpstreamBusyError branch
calls EnqueueA2A and returns 202 on success. Falls through to the
legacy 503 on enqueue error (DB hiccup shouldn't silently drop).
- internal/handlers/registry.go — RegistryHandler gets a QueueDrainFunc
injection hook (SetQueueDrainFunc). When Heartbeat sees
active_tasks < max_concurrent_tasks, spawns a goroutine that calls
the drain hook. context.WithoutCancel ensures the drain outlives
the heartbeat handler's ctx.
- internal/router/router.go — wires wh.DrainQueueForWorkspace into
rh.SetQueueDrainFunc after both are constructed.
## Not in this PR (Phase 2/3/4 follow-ups)
- INFO priority + TTL (Phase 2)
- CRITICAL priority + soft preemption between tool calls (Phase 3)
- Age-based promotion so TASK doesn't starve (Phase 4)
- `GET /workspaces/:id/queue` observability endpoint
Schema already supports all of these; only the dispatch + policy code
remains.
## Tests
- TestExtractIdempotencyKey (5 cases): messageId parsing is robust
- TestPriorityConstants: ordering invariant + 50=TASK default
alignment with migration DEFAULT
Full DB-touching tests (FIFO order, retry bound, idempotency conflict)
intentionally deferred to the CI migration-enabled path — sqlmock
ceremony would duplicate the existing test infrastructure 3× over and
the behaviour is directly expressible in SQL constraints (FOR UPDATE
SKIP LOCKED, partial unique index).
## Expected impact once deployed
- a2a_receive error with "busy" flavor drops from ~69/10min observed
today to ~0
- delegation_failed rate drops from ~50% to <5%
- real_output metric rises from ~30/15min back toward the pre-
throttle baseline
Closes #1870 Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
||
|---|---|---|
| .. | ||
| 001_workspaces.sql | ||
| 002_agents.sql | ||
| 003_events.sql | ||
| 004_secrets.sql | ||
| 005_canvas_layouts.sql | ||
| 006_workspace_config_memory.sql | ||
| 007_approvals.sql | ||
| 008_agent_memories.sql | ||
| 009_activity_logs.sql | ||
| 010_workspace_awareness.sql | ||
| 011_workspace_runtime.sql | ||
| 012_global_secrets.sql | ||
| 013_workspace_dir.sql | ||
| 014_indexes.sql | ||
| 015_workspace_schedules.sql | ||
| 016_workspace_channels.sql | ||
| 017_memories_fts_namespace.down.sql | ||
| 017_memories_fts_namespace.up.sql | ||
| 018_secrets_encryption_version.down.sql | ||
| 018_secrets_encryption_version.up.sql | ||
| 019_workspace_access.down.sql | ||
| 019_workspace_access.up.sql | ||
| 020_workspace_auth_tokens.down.sql | ||
| 020_workspace_auth_tokens.up.sql | ||
| 021_delegation_idempotency.down.sql | ||
| 021_delegation_idempotency.up.sql | ||
| 022_workspace_schedules_source.down.sql | ||
| 022_workspace_schedules_source.up.sql | ||
| 023_workspace_memory_version.down.sql | ||
| 023_workspace_memory_version.up.sql | ||
| 024_channel_budget.down.sql | ||
| 024_channel_budget.up.sql | ||
| 025_workspace_token_usage.down.sql | ||
| 025_workspace_token_usage.up.sql | ||
| 026_org_plugin_allowlist.down.sql | ||
| 026_org_plugin_allowlist.up.sql | ||
| 027_workspace_budget.down.sql | ||
| 027_workspace_budget.up.sql | ||
| 028_workspace_artifacts.down.sql | ||
| 028_workspace_artifacts.up.sql | ||
| 029_workspace_hibernation.down.sql | ||
| 029_workspace_hibernation.up.sql | ||
| 030_audit_events.down.sql | ||
| 030_audit_events.up.sql | ||
| 031_memories_pgvector.down.sql | ||
| 031_memories_pgvector.up.sql | ||
| 032_schedule_consecutive_empty.down.sql | ||
| 032_schedule_consecutive_empty.up.sql | ||
| 033_strip_crlf_cron_prompts.up.sql | ||
| 034_workspaces_last_outbound_at.up.sql | ||
| 035_org_api_tokens.down.sql | ||
| 035_org_api_tokens.up.sql | ||
| 036_org_api_tokens_org_id.down.sql | ||
| 036_org_api_tokens_org_id.up.sql | ||
| 037_max_concurrent_tasks.down.sql | ||
| 037_max_concurrent_tasks.up.sql | ||
| 038_workspace_instance_id.down.sql | ||
| 038_workspace_instance_id.up.sql | ||
| 039_activity_tool_trace.down.sql | ||
| 039_activity_tool_trace.up.sql | ||
| 040_platform_instructions.down.sql | ||
| 040_platform_instructions.up.sql | ||
| 042_a2a_queue.down.sql | ||
| 042_a2a_queue.up.sql | ||
| 20260417000000_workflow_checkpoints.down.sql | ||
| 20260417000000_workflow_checkpoints.up.sql | ||