Merge pull request #2794 from Molecule-AI/fix/cfg-prov-conc-iso

feat(org-import): make provision concurrency configurable via env
This commit is contained in:
Hongming Wang 2026-05-04 23:37:15 +00:00 committed by GitHub
commit 0dd1244510
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 164 additions and 6 deletions

View File

@ -11,6 +11,8 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
@ -25,10 +27,62 @@ import (
// during org import. Prevents overwhelming Docker when creating many containers.
const workspaceCreatePacingMs = 2000
// provisionConcurrency limits how many Docker containers can be provisioned
// simultaneously during org import. Without this, importing 39+ workspaces
// fires 39 goroutines that all hit Docker at once, causing timeouts (#1084).
const provisionConcurrency = 3
// defaultProvisionConcurrency is the fallback cap for parallel
// workspace-provision goroutines when MOLECULE_PROVISION_CONCURRENCY
// is unset. Originally a hard constant of 3 (PR #1084) calibrated for
// Docker-mode workspaces. The constant is now a default — operators
// running on EC2 (where each provision is a RunInstances call AWS
// happily parallelises) typically want a much higher cap, while
// Docker-mode dev environments still prefer the conservative 3.
//
// 3 keeps the existing Docker-mode behavior. SaaS deployments override
// via env (see resolveProvisionConcurrency below).
const defaultProvisionConcurrency = 3
// resolveProvisionConcurrency returns the effective semaphore size for
// org-import workspace provisioning, honoring MOLECULE_PROVISION_CONCURRENCY:
//
// - unset / empty / non-numeric → defaultProvisionConcurrency (3)
// - "0" → unlimited (a very large cap;
// practically no semaphore — used on
// SaaS where AWS RunInstances is the
// rate-limiter, not us)
// - any positive integer N → N
// - negative integer → defaultProvisionConcurrency (3),
// log warning so operator notices
// the misconfiguration
//
// The "0 = unlimited" mapping was a deliberate choice: an env var of "0"
// is the natural shorthand for "no cap" without forcing operators to
// type a magic large number. The implementation hands off a large but
// finite value (1<<20) so the channel still works as a regular
// buffered chan; goroutines will never block on the semaphore in
// practice.
func resolveProvisionConcurrency() int {
raw := strings.TrimSpace(os.Getenv("MOLECULE_PROVISION_CONCURRENCY"))
if raw == "" {
return defaultProvisionConcurrency
}
n, err := strconv.Atoi(raw)
if err != nil {
log.Printf("org_import: MOLECULE_PROVISION_CONCURRENCY=%q is not an integer; falling back to default %d",
raw, defaultProvisionConcurrency)
return defaultProvisionConcurrency
}
if n < 0 {
log.Printf("org_import: MOLECULE_PROVISION_CONCURRENCY=%d is negative; falling back to default %d",
n, defaultProvisionConcurrency)
return defaultProvisionConcurrency
}
if n == 0 {
// Unlimited semantics — use a large but finite cap so the
// chan-based semaphore stays a no-op. 1M is well past any
// realistic org-import size; AWS RunInstances rate-limit and
// account vCPU quota are the real backpressure here.
return 1 << 20
}
return n
}
// Child grid layout constants — kept in sync with canvas-topology.ts on
// the client. Children laid on import use the same 2-column grid so the
@ -600,8 +654,16 @@ func (h *OrgHandler) Import(c *gin.Context) {
results := []map[string]interface{}{}
var createErr error
// Semaphore limits concurrent Docker provisioning (#1084).
provisionSem := make(chan struct{}, provisionConcurrency)
// Semaphore limits concurrent provision goroutines (#1084).
// Cap is configurable via MOLECULE_PROVISION_CONCURRENCY:
// unset → 3 (Docker-mode default)
// "0" → effectively unlimited (SaaS / EC2 backend)
// N>0 → exactly N
// See resolveProvisionConcurrency for the full env-parse contract.
concurrency := resolveProvisionConcurrency()
provisionSem := make(chan struct{}, concurrency)
log.Printf("org_import: provision concurrency cap=%d (env MOLECULE_PROVISION_CONCURRENCY=%q)",
concurrency, os.Getenv("MOLECULE_PROVISION_CONCURRENCY"))
// Recursively create workspaces. Root workspaces keep their YAML
// canvas coords; children are positioned by createWorkspaceTree

View File

@ -0,0 +1,96 @@
package handlers
import (
"testing"
)
// Tests for resolveProvisionConcurrency — the env-parse contract that
// turns MOLECULE_PROVISION_CONCURRENCY into the channel-buffer size for
// the org-import provision semaphore.
//
// Why this matters: with the wrong cap, org-import either serializes
// (cap=1, slow) or stampedes the provider (cap=infinity on a backend
// that can't take it). The defaults — 3 for Docker, "0=unlimited" for
// EC2/SaaS — are what most operators want; the parse logic exists to
// route the env var to the right behavior without surprise.
//
// The "0 → unlimited" mapping is the user-facing piece worth pinning
// in tests: easy to misread as "0 means stop entirely" if someone
// re-reads the constant block years later.
func TestResolveProvisionConcurrency_UnsetUsesDefault(t *testing.T) {
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "")
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
t.Errorf("unset env: got %d, want %d", got, defaultProvisionConcurrency)
}
}
func TestResolveProvisionConcurrency_ZeroIsUnlimited(t *testing.T) {
// "0" is the user-facing shorthand for "no cap". The implementation
// returns a large but finite cap so the channel-based semaphore
// stays a no-op without infinite-buffer risk.
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "0")
got := resolveProvisionConcurrency()
if got <= defaultProvisionConcurrency {
t.Errorf("0 should map to large 'unlimited' cap, got %d", got)
}
// 1<<20 today; pin the lower bound rather than the exact value so
// future tuning of the magic number doesn't break this test.
if got < 1024 {
t.Errorf("0 should map to a cap >= 1024 (effectively unlimited), got %d", got)
}
}
func TestResolveProvisionConcurrency_PositiveIntegerExact(t *testing.T) {
cases := []struct {
env string
want int
}{
{"1", 1},
{"5", 5},
{"10", 10},
{"50", 50},
}
for _, tc := range cases {
t.Run(tc.env, func(t *testing.T) {
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", tc.env)
if got := resolveProvisionConcurrency(); got != tc.want {
t.Errorf("env=%q: got %d, want %d", tc.env, got, tc.want)
}
})
}
}
func TestResolveProvisionConcurrency_NegativeFallsBackToDefault(t *testing.T) {
// Negative values are operator misconfiguration. Fall back to the
// safe default rather than passing through to make(chan, -5) which
// panics. The handler logs a warning so the operator notices.
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "-5")
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
t.Errorf("negative env: got %d, want default %d", got, defaultProvisionConcurrency)
}
}
func TestResolveProvisionConcurrency_NonNumericFallsBackToDefault(t *testing.T) {
// Garbage in env shouldn't crash org-import. Common in dev when an
// operator types `MOLECULE_PROVISION_CONCURRENCY=true` or similar.
cases := []string{"true", "yes", "infinity", "ten", "3.5", "0x10"}
for _, raw := range cases {
t.Run(raw, func(t *testing.T) {
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", raw)
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
t.Errorf("non-numeric env=%q: got %d, want default %d",
raw, got, defaultProvisionConcurrency)
}
})
}
}
func TestResolveProvisionConcurrency_WhitespaceTrimmed(t *testing.T) {
// Operators frequently set env vars with stray whitespace from
// copy-paste. Trim before parse so " 7 " == "7".
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", " 7 ")
if got := resolveProvisionConcurrency(); got != 7 {
t.Errorf("whitespace env: got %d, want 7", got)
}
}