diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go index cb638a6f..aa31930a 100644 --- a/workspace-server/internal/handlers/handlers_additional_test.go +++ b/workspace-server/internal/handlers/handlers_additional_test.go @@ -31,7 +31,7 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) { mock.ExpectBegin() // Default tier is 3 (Privileged) — see workspace.go create-handler comment. mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil)). + WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), 1). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -66,7 +66,7 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)). + WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 1). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -277,6 +277,45 @@ func TestWorkspaceList_WithData(t *testing.T) { } } +// ---------- workspace.go: Create with explicit max_concurrent_tasks (#1408) ---------- + +// Locks the wire-up that lets a leader workspace's template config.yaml +// (or a direct API caller) set max_concurrent_tasks > 1 so an in-flight +// cron doesn't reject incoming A2A delegations. Schema default is 1; a +// non-zero payload value must reach the INSERT verbatim. +func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectBegin() + // 11th arg = max_concurrent_tasks; payload says 3, must propagate. + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + mock.ExpectExec("INSERT INTO canvas_layouts"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"name":"Leader Agent","runtime":"claude-code","max_concurrent_tasks":3}` + c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Create(c) + + if w.Code != http.StatusCreated { + t.Errorf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + // ---------- registry.go: Register with provisioner URL preserved ---------- func TestRegister_ProvisionerURLPreserved(t *testing.T) { diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index c490b2be..83a91e6c 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -290,8 +290,10 @@ func TestWorkspaceCreate(t *testing.T) { // Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace). // Default tier is 3 (Privileged) — see workspace.go create-handler comment. + // 11th arg: max_concurrent_tasks defaults to 1 when payload omits it + // (see workspace.go Create handler — schema default mirror, #1408). mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)). + WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 1). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect transaction commit (no secrets in this payload) diff --git a/workspace-server/internal/handlers/org.go b/workspace-server/internal/handlers/org.go index 9a7f6692..79d8a016 100644 --- a/workspace-server/internal/handlers/org.go +++ b/workspace-server/internal/handlers/org.go @@ -287,8 +287,16 @@ type OrgWorkspace struct { // InitialMemories are memories seeded into this workspace at creation // time. If empty, defaults.initial_memories are used. Issue #1050. InitialMemories []models.MemorySeed `yaml:"initial_memories" json:"initial_memories"` - Schedules []OrgSchedule `yaml:"schedules" json:"schedules"` - Channels []OrgChannel `yaml:"channels" json:"channels"` + // MaxConcurrentTasks lets a leader workspace handle multiple A2A + // messages + cron fires concurrently (#1408). Default 1 keeps the + // classic worker-style serialised execution; bump to 3 for leaders + // (PM, leads with 5-min orchestrator pulses) so an in-flight cron + // doesn't reject incoming A2A delegations. Wired into the workspaces + // table at template-import time + read by the scheduler's capacity + // check (workspace-server/internal/scheduler/scheduler.go). + MaxConcurrentTasks int `yaml:"max_concurrent_tasks" json:"max_concurrent_tasks"` + Schedules []OrgSchedule `yaml:"schedules" json:"schedules"` + Channels []OrgChannel `yaml:"channels" json:"channels"` External bool `yaml:"external" json:"external"` URL string `yaml:"url" json:"url"` Canvas struct { diff --git a/workspace-server/internal/handlers/org_import.go b/workspace-server/internal/handlers/org_import.go index 5e50e7ae..8bcb5777 100644 --- a/workspace-server/internal/handlers/org_import.go +++ b/workspace-server/internal/handlers/org_import.go @@ -103,10 +103,17 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX // (see canvas-topology.ts), so imports don't spray the viewport. initialCollapsed := false + // max_concurrent_tasks from org template (#1408). 0/omitted → schema + // default of 1. Leaders typically set 3 in their org.yaml so an + // in-flight cron doesn't reject incoming A2A delegations. + maxConcurrent := ws.MaxConcurrentTasks + if maxConcurrent <= 0 { + maxConcurrent = 1 + } _, err := db.DB.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - `, id, ws.Name, role, tier, runtime, awarenessNS, "provisioning", parentID, workspaceDir, workspaceAccess) + INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, max_concurrent_tasks) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + `, id, ws.Name, role, tier, runtime, awarenessNS, "provisioning", parentID, workspaceDir, workspaceAccess, maxConcurrent) if err != nil { log.Printf("Org import: failed to create %s: %v", ws.Name, err) return fmt.Errorf("failed to create %s: %w", ws.Name, err) diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index 13364666..9d041316 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -210,11 +210,18 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { return } + // max_concurrent_tasks: payload-supplied → write as-is; 0/omitted → + // fall back to the schema default (1). Single INSERT shape regardless, + // avoids a forked column list. (#1408) + maxConcurrent := payload.MaxConcurrentTasks + if maxConcurrent <= 0 { + maxConcurrent = 1 + } // Insert workspace with runtime persisted in DB (inside transaction) _, err := tx.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit) - VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10) - `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit) + INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks) + VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11) + `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent) if err != nil { tx.Rollback() //nolint:errcheck log.Printf("Create workspace error: %v", err) diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index 26061a1f..da71db48 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -85,6 +85,12 @@ type CreateWorkspacePayload struct { // workspace secrets at creation time. Stored encrypted (same path as // POST /workspaces/:id/secrets). Nil/empty map is a no-op. Secrets map[string]string `json:"secrets"` + // MaxConcurrentTasks caps how many A2A messages + cron fires the + // scheduler will dispatch in parallel for this workspace (#1408). + // 0 = use the schema default of 1 (serialised, worker-style). + // Set to 3 for leaders with frequent orchestrator pulses so an + // in-flight cron doesn't reject incoming A2A delegations. + MaxConcurrentTasks int `json:"max_concurrent_tasks"` Canvas struct { X float64 `json:"x"` Y float64 `json:"y"`