feat(workspaces): wire max_concurrent_tasks from template config.yaml (#1408)

Phase 4 of #1408 (active_tasks counter). Runtime increment/decrement,
schema column (037), and scheduler enforcement (scheduler.go:312)
already shipped — but the write path from template config.yaml +
direct API was missing, so every workspace silently fell through to
the schema default of 1. Leaders that set max_concurrent_tasks: 3 in
their org template were getting 1 anyway, defeating the entire
feature for the use case it was built for (cron-vs-A2A contention on
PM/lead workspaces).

- OrgWorkspace gains MaxConcurrentTasks (yaml + json tags)
- CreateWorkspacePayload gains MaxConcurrentTasks (json tag)
- Both INSERTs now write the column unconditionally; 0/omitted
  payload value falls back to 1 (schema default mirror) so the wire
  stays single-shape — no forked column list / goto.
- Existing Create-handler test mocks updated to expect the 11th arg.
- New TestWorkspaceCreate_MaxConcurrentTasksOverride locks the
  payload→DB propagation for the leader case (value=3).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
rabbitblood 2026-04-26 11:03:01 -07:00
parent 5b346ab3e7
commit ad5295cd8a
6 changed files with 80 additions and 11 deletions

View File

@ -31,7 +31,7 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) {
mock.ExpectBegin()
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil)).
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), 1).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -66,7 +66,7 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 1).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -277,6 +277,45 @@ func TestWorkspaceList_WithData(t *testing.T) {
}
}
// ---------- workspace.go: Create with explicit max_concurrent_tasks (#1408) ----------
// Locks the wire-up that lets a leader workspace's template config.yaml
// (or a direct API caller) set max_concurrent_tasks > 1 so an in-flight
// cron doesn't reject incoming A2A delegations. Schema default is 1; a
// non-zero payload value must reach the INSERT verbatim.
func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mock.ExpectBegin()
// 11th arg = max_concurrent_tasks; payload says 3, must propagate.
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"name":"Leader Agent","runtime":"claude-code","max_concurrent_tasks":3}`
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Errorf("expected 201, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// ---------- registry.go: Register with provisioner URL preserved ----------
func TestRegister_ProvisionerURLPreserved(t *testing.T) {

View File

@ -290,8 +290,10 @@ func TestWorkspaceCreate(t *testing.T) {
// Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace).
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
// 11th arg: max_concurrent_tasks defaults to 1 when payload omits it
// (see workspace.go Create handler — schema default mirror, #1408).
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 1).
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect transaction commit (no secrets in this payload)

View File

@ -287,8 +287,16 @@ type OrgWorkspace struct {
// InitialMemories are memories seeded into this workspace at creation
// time. If empty, defaults.initial_memories are used. Issue #1050.
InitialMemories []models.MemorySeed `yaml:"initial_memories" json:"initial_memories"`
Schedules []OrgSchedule `yaml:"schedules" json:"schedules"`
Channels []OrgChannel `yaml:"channels" json:"channels"`
// MaxConcurrentTasks lets a leader workspace handle multiple A2A
// messages + cron fires concurrently (#1408). Default 1 keeps the
// classic worker-style serialised execution; bump to 3 for leaders
// (PM, leads with 5-min orchestrator pulses) so an in-flight cron
// doesn't reject incoming A2A delegations. Wired into the workspaces
// table at template-import time + read by the scheduler's capacity
// check (workspace-server/internal/scheduler/scheduler.go).
MaxConcurrentTasks int `yaml:"max_concurrent_tasks" json:"max_concurrent_tasks"`
Schedules []OrgSchedule `yaml:"schedules" json:"schedules"`
Channels []OrgChannel `yaml:"channels" json:"channels"`
External bool `yaml:"external" json:"external"`
URL string `yaml:"url" json:"url"`
Canvas struct {

View File

@ -103,10 +103,17 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
// (see canvas-topology.ts), so imports don't spray the viewport.
initialCollapsed := false
// max_concurrent_tasks from org template (#1408). 0/omitted → schema
// default of 1. Leaders typically set 3 in their org.yaml so an
// in-flight cron doesn't reject incoming A2A delegations.
maxConcurrent := ws.MaxConcurrentTasks
if maxConcurrent <= 0 {
maxConcurrent = 1
}
_, err := db.DB.ExecContext(ctx, `
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`, id, ws.Name, role, tier, runtime, awarenessNS, "provisioning", parentID, workspaceDir, workspaceAccess)
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, max_concurrent_tasks)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`, id, ws.Name, role, tier, runtime, awarenessNS, "provisioning", parentID, workspaceDir, workspaceAccess, maxConcurrent)
if err != nil {
log.Printf("Org import: failed to create %s: %v", ws.Name, err)
return fmt.Errorf("failed to create %s: %w", ws.Name, err)

View File

@ -210,11 +210,18 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
return
}
// max_concurrent_tasks: payload-supplied → write as-is; 0/omitted →
// fall back to the schema default (1). Single INSERT shape regardless,
// avoids a forked column list. (#1408)
maxConcurrent := payload.MaxConcurrentTasks
if maxConcurrent <= 0 {
maxConcurrent = 1
}
// Insert workspace with runtime persisted in DB (inside transaction)
_, err := tx.ExecContext(ctx, `
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit)
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10)
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit)
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks)
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11)
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent)
if err != nil {
tx.Rollback() //nolint:errcheck
log.Printf("Create workspace error: %v", err)

View File

@ -85,6 +85,12 @@ type CreateWorkspacePayload struct {
// workspace secrets at creation time. Stored encrypted (same path as
// POST /workspaces/:id/secrets). Nil/empty map is a no-op.
Secrets map[string]string `json:"secrets"`
// MaxConcurrentTasks caps how many A2A messages + cron fires the
// scheduler will dispatch in parallel for this workspace (#1408).
// 0 = use the schema default of 1 (serialised, worker-style).
// Set to 3 for leaders with frequent orchestrator pulses so an
// in-flight cron doesn't reject incoming A2A delegations.
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
Canvas struct {
X float64 `json:"x"`
Y float64 `json:"y"`