feat(org-import): make provision concurrency configurable via env
Org-import was hard-capped at 3 concurrent workspace provisions (#1084), calibrated for Docker-mode workspaces where each provision was a docker-run. Now that workspaces are EC2 instances, AWS RunInstances parallelises happily and the artificial cap of 3 makes a 7-workspace org-import take 3-4× longer than necessary (3 batches × ~70s/provision ≈ 4 min wall time when AWS could absorb all 7 in parallel for ~70s). This PR makes the cap configurable via MOLECULE_PROVISION_CONCURRENCY: unset → 3 (Docker-mode default, unchanged) "0" → effectively unlimited (SaaS / EC2 backend; AWS rate-limit + vCPU quota are the real backpressure) N>0 → exactly N N<0 → fall back to default 3 + warning log garbage → fall back to default 3 + warning log The "0 = unlimited" mapping is the user-facing convention requested for SaaS deployments — operators don't have to pick an arbitrary large number. Implementation hands off 1<<20 internally so the channel-based semaphore stays a no-op without infinite-buffer risk. Test coverage (org_provision_concurrency_test.go, 6 cases / 15 subtests): - unset → default - "0" → large unlimited cap - positive integer exact (1, 5, 10, 50) - negative → default + warning - non-numeric → default + warning - whitespace-trimmed (" 7 " → 7) Boot-time log line confirms the resolved cap so an operator can verify their env is being honored without re-deploying. Does NOT address the separate 600s "never registered" timeout the user also reported during org-import — that's filed as molecule-core#2793 for proper investigation (parallel-provision contention, network routing, register-retry budget, or container-start failure are all candidates and need live SSM capture to bisect). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ed4d24fb8c
commit
6d7a7fc86f
@ -11,6 +11,8 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||||
@ -25,10 +27,62 @@ import (
|
|||||||
// during org import. Prevents overwhelming Docker when creating many containers.
|
// during org import. Prevents overwhelming Docker when creating many containers.
|
||||||
const workspaceCreatePacingMs = 2000
|
const workspaceCreatePacingMs = 2000
|
||||||
|
|
||||||
// provisionConcurrency limits how many Docker containers can be provisioned
|
// defaultProvisionConcurrency is the fallback cap for parallel
|
||||||
// simultaneously during org import. Without this, importing 39+ workspaces
|
// workspace-provision goroutines when MOLECULE_PROVISION_CONCURRENCY
|
||||||
// fires 39 goroutines that all hit Docker at once, causing timeouts (#1084).
|
// is unset. Originally a hard constant of 3 (PR #1084) calibrated for
|
||||||
const provisionConcurrency = 3
|
// Docker-mode workspaces. The constant is now a default — operators
|
||||||
|
// running on EC2 (where each provision is a RunInstances call AWS
|
||||||
|
// happily parallelises) typically want a much higher cap, while
|
||||||
|
// Docker-mode dev environments still prefer the conservative 3.
|
||||||
|
//
|
||||||
|
// 3 keeps the existing Docker-mode behavior. SaaS deployments override
|
||||||
|
// via env (see resolveProvisionConcurrency below).
|
||||||
|
const defaultProvisionConcurrency = 3
|
||||||
|
|
||||||
|
// resolveProvisionConcurrency returns the effective semaphore size for
|
||||||
|
// org-import workspace provisioning, honoring MOLECULE_PROVISION_CONCURRENCY:
|
||||||
|
//
|
||||||
|
// - unset / empty / non-numeric → defaultProvisionConcurrency (3)
|
||||||
|
// - "0" → unlimited (a very large cap;
|
||||||
|
// practically no semaphore — used on
|
||||||
|
// SaaS where AWS RunInstances is the
|
||||||
|
// rate-limiter, not us)
|
||||||
|
// - any positive integer N → N
|
||||||
|
// - negative integer → defaultProvisionConcurrency (3),
|
||||||
|
// log warning so operator notices
|
||||||
|
// the misconfiguration
|
||||||
|
//
|
||||||
|
// The "0 = unlimited" mapping was a deliberate choice: an env var of "0"
|
||||||
|
// is the natural shorthand for "no cap" without forcing operators to
|
||||||
|
// type a magic large number. The implementation hands off a large but
|
||||||
|
// finite value (1<<20) so the channel still works as a regular
|
||||||
|
// buffered chan; goroutines will never block on the semaphore in
|
||||||
|
// practice.
|
||||||
|
func resolveProvisionConcurrency() int {
|
||||||
|
raw := strings.TrimSpace(os.Getenv("MOLECULE_PROVISION_CONCURRENCY"))
|
||||||
|
if raw == "" {
|
||||||
|
return defaultProvisionConcurrency
|
||||||
|
}
|
||||||
|
n, err := strconv.Atoi(raw)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("org_import: MOLECULE_PROVISION_CONCURRENCY=%q is not an integer; falling back to default %d",
|
||||||
|
raw, defaultProvisionConcurrency)
|
||||||
|
return defaultProvisionConcurrency
|
||||||
|
}
|
||||||
|
if n < 0 {
|
||||||
|
log.Printf("org_import: MOLECULE_PROVISION_CONCURRENCY=%d is negative; falling back to default %d",
|
||||||
|
n, defaultProvisionConcurrency)
|
||||||
|
return defaultProvisionConcurrency
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
// Unlimited semantics — use a large but finite cap so the
|
||||||
|
// chan-based semaphore stays a no-op. 1M is well past any
|
||||||
|
// realistic org-import size; AWS RunInstances rate-limit and
|
||||||
|
// account vCPU quota are the real backpressure here.
|
||||||
|
return 1 << 20
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
// Child grid layout constants — kept in sync with canvas-topology.ts on
|
// Child grid layout constants — kept in sync with canvas-topology.ts on
|
||||||
// the client. Children laid on import use the same 2-column grid so the
|
// the client. Children laid on import use the same 2-column grid so the
|
||||||
@ -600,8 +654,16 @@ func (h *OrgHandler) Import(c *gin.Context) {
|
|||||||
results := []map[string]interface{}{}
|
results := []map[string]interface{}{}
|
||||||
var createErr error
|
var createErr error
|
||||||
|
|
||||||
// Semaphore limits concurrent Docker provisioning (#1084).
|
// Semaphore limits concurrent provision goroutines (#1084).
|
||||||
provisionSem := make(chan struct{}, provisionConcurrency)
|
// Cap is configurable via MOLECULE_PROVISION_CONCURRENCY:
|
||||||
|
// unset → 3 (Docker-mode default)
|
||||||
|
// "0" → effectively unlimited (SaaS / EC2 backend)
|
||||||
|
// N>0 → exactly N
|
||||||
|
// See resolveProvisionConcurrency for the full env-parse contract.
|
||||||
|
concurrency := resolveProvisionConcurrency()
|
||||||
|
provisionSem := make(chan struct{}, concurrency)
|
||||||
|
log.Printf("org_import: provision concurrency cap=%d (env MOLECULE_PROVISION_CONCURRENCY=%q)",
|
||||||
|
concurrency, os.Getenv("MOLECULE_PROVISION_CONCURRENCY"))
|
||||||
|
|
||||||
// Recursively create workspaces. Root workspaces keep their YAML
|
// Recursively create workspaces. Root workspaces keep their YAML
|
||||||
// canvas coords; children are positioned by createWorkspaceTree
|
// canvas coords; children are positioned by createWorkspaceTree
|
||||||
|
|||||||
@ -0,0 +1,96 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tests for resolveProvisionConcurrency — the env-parse contract that
|
||||||
|
// turns MOLECULE_PROVISION_CONCURRENCY into the channel-buffer size for
|
||||||
|
// the org-import provision semaphore.
|
||||||
|
//
|
||||||
|
// Why this matters: with the wrong cap, org-import either serializes
|
||||||
|
// (cap=1, slow) or stampedes the provider (cap=infinity on a backend
|
||||||
|
// that can't take it). The defaults — 3 for Docker, "0=unlimited" for
|
||||||
|
// EC2/SaaS — are what most operators want; the parse logic exists to
|
||||||
|
// route the env var to the right behavior without surprise.
|
||||||
|
//
|
||||||
|
// The "0 → unlimited" mapping is the user-facing piece worth pinning
|
||||||
|
// in tests: easy to misread as "0 means stop entirely" if someone
|
||||||
|
// re-reads the constant block years later.
|
||||||
|
|
||||||
|
func TestResolveProvisionConcurrency_UnsetUsesDefault(t *testing.T) {
|
||||||
|
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "")
|
||||||
|
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
|
||||||
|
t.Errorf("unset env: got %d, want %d", got, defaultProvisionConcurrency)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveProvisionConcurrency_ZeroIsUnlimited(t *testing.T) {
|
||||||
|
// "0" is the user-facing shorthand for "no cap". The implementation
|
||||||
|
// returns a large but finite cap so the channel-based semaphore
|
||||||
|
// stays a no-op without infinite-buffer risk.
|
||||||
|
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "0")
|
||||||
|
got := resolveProvisionConcurrency()
|
||||||
|
if got <= defaultProvisionConcurrency {
|
||||||
|
t.Errorf("0 should map to large 'unlimited' cap, got %d", got)
|
||||||
|
}
|
||||||
|
// 1<<20 today; pin the lower bound rather than the exact value so
|
||||||
|
// future tuning of the magic number doesn't break this test.
|
||||||
|
if got < 1024 {
|
||||||
|
t.Errorf("0 should map to a cap >= 1024 (effectively unlimited), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveProvisionConcurrency_PositiveIntegerExact(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
env string
|
||||||
|
want int
|
||||||
|
}{
|
||||||
|
{"1", 1},
|
||||||
|
{"5", 5},
|
||||||
|
{"10", 10},
|
||||||
|
{"50", 50},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.env, func(t *testing.T) {
|
||||||
|
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", tc.env)
|
||||||
|
if got := resolveProvisionConcurrency(); got != tc.want {
|
||||||
|
t.Errorf("env=%q: got %d, want %d", tc.env, got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveProvisionConcurrency_NegativeFallsBackToDefault(t *testing.T) {
|
||||||
|
// Negative values are operator misconfiguration. Fall back to the
|
||||||
|
// safe default rather than passing through to make(chan, -5) which
|
||||||
|
// panics. The handler logs a warning so the operator notices.
|
||||||
|
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "-5")
|
||||||
|
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
|
||||||
|
t.Errorf("negative env: got %d, want default %d", got, defaultProvisionConcurrency)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveProvisionConcurrency_NonNumericFallsBackToDefault(t *testing.T) {
|
||||||
|
// Garbage in env shouldn't crash org-import. Common in dev when an
|
||||||
|
// operator types `MOLECULE_PROVISION_CONCURRENCY=true` or similar.
|
||||||
|
cases := []string{"true", "yes", "infinity", "ten", "3.5", "0x10"}
|
||||||
|
for _, raw := range cases {
|
||||||
|
t.Run(raw, func(t *testing.T) {
|
||||||
|
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", raw)
|
||||||
|
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
|
||||||
|
t.Errorf("non-numeric env=%q: got %d, want default %d",
|
||||||
|
raw, got, defaultProvisionConcurrency)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveProvisionConcurrency_WhitespaceTrimmed(t *testing.T) {
|
||||||
|
// Operators frequently set env vars with stray whitespace from
|
||||||
|
// copy-paste. Trim before parse so " 7 " == "7".
|
||||||
|
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", " 7 ")
|
||||||
|
if got := resolveProvisionConcurrency(); got != 7 {
|
||||||
|
t.Errorf("whitespace env: got %d, want 7", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user