feat(org-import): make provision concurrency configurable via env

Org-import was hard-capped at 3 concurrent workspace provisions (#1084),
calibrated for Docker-mode workspaces where each provision was a
docker-run. Now that workspaces are EC2 instances, AWS RunInstances
parallelises happily and the artificial cap of 3 makes a 7-workspace
org-import take 3-4× longer than necessary (3 batches × ~70s/provision
≈ 4 min wall time when AWS could absorb all 7 in parallel for ~70s).

This PR makes the cap configurable via MOLECULE_PROVISION_CONCURRENCY:
  unset    → 3 (Docker-mode default, unchanged)
  "0"      → effectively unlimited (SaaS / EC2 backend; AWS rate-limit
             + vCPU quota are the real backpressure)
  N>0      → exactly N
  N<0      → fall back to default 3 + warning log
  garbage  → fall back to default 3 + warning log

The "0 = unlimited" mapping is the user-facing convention requested for
SaaS deployments — operators don't have to pick an arbitrary large
number. Implementation hands off 1<<20 internally so the channel-based
semaphore stays a no-op without infinite-buffer risk.

Test coverage (org_provision_concurrency_test.go, 6 cases / 15 subtests):
- unset → default
- "0" → large unlimited cap
- positive integer exact (1, 5, 10, 50)
- negative → default + warning
- non-numeric → default + warning
- whitespace-trimmed (" 7 " → 7)

Boot-time log line confirms the resolved cap so an operator can verify
their env is being honored without re-deploying.

Does NOT address the separate 600s "never registered" timeout the user
also reported during org-import — that's filed as molecule-core#2793
for proper investigation (parallel-provision contention, network
routing, register-retry budget, or container-start failure are all
candidates and need live SSM capture to bisect).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-04 16:33:49 -07:00
parent e39d818ac4
commit 3bc7749e84
2 changed files with 164 additions and 6 deletions

View File

@ -11,6 +11,8 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
@ -25,10 +27,62 @@ import (
// during org import. Prevents overwhelming Docker when creating many containers.
const workspaceCreatePacingMs = 2000
// provisionConcurrency limits how many Docker containers can be provisioned
// simultaneously during org import. Without this, importing 39+ workspaces
// fires 39 goroutines that all hit Docker at once, causing timeouts (#1084).
const provisionConcurrency = 3
// defaultProvisionConcurrency is the fallback cap for parallel
// workspace-provision goroutines when MOLECULE_PROVISION_CONCURRENCY
// is unset. Originally a hard constant of 3 (PR #1084) calibrated for
// Docker-mode workspaces. The constant is now a default — operators
// running on EC2 (where each provision is a RunInstances call AWS
// happily parallelises) typically want a much higher cap, while
// Docker-mode dev environments still prefer the conservative 3.
//
// 3 keeps the existing Docker-mode behavior. SaaS deployments override
// via env (see resolveProvisionConcurrency below).
const defaultProvisionConcurrency = 3
// resolveProvisionConcurrency returns the effective semaphore size for
// org-import workspace provisioning, honoring MOLECULE_PROVISION_CONCURRENCY:
//
// - unset / empty / non-numeric → defaultProvisionConcurrency (3)
// - "0" → unlimited (a very large cap;
// practically no semaphore — used on
// SaaS where AWS RunInstances is the
// rate-limiter, not us)
// - any positive integer N → N
// - negative integer → defaultProvisionConcurrency (3),
// log warning so operator notices
// the misconfiguration
//
// The "0 = unlimited" mapping was a deliberate choice: an env var of "0"
// is the natural shorthand for "no cap" without forcing operators to
// type a magic large number. The implementation hands off a large but
// finite value (1<<20) so the channel still works as a regular
// buffered chan; goroutines will never block on the semaphore in
// practice.
func resolveProvisionConcurrency() int {
raw := strings.TrimSpace(os.Getenv("MOLECULE_PROVISION_CONCURRENCY"))
if raw == "" {
return defaultProvisionConcurrency
}
n, err := strconv.Atoi(raw)
if err != nil {
log.Printf("org_import: MOLECULE_PROVISION_CONCURRENCY=%q is not an integer; falling back to default %d",
raw, defaultProvisionConcurrency)
return defaultProvisionConcurrency
}
if n < 0 {
log.Printf("org_import: MOLECULE_PROVISION_CONCURRENCY=%d is negative; falling back to default %d",
n, defaultProvisionConcurrency)
return defaultProvisionConcurrency
}
if n == 0 {
// Unlimited semantics — use a large but finite cap so the
// chan-based semaphore stays a no-op. 1M is well past any
// realistic org-import size; AWS RunInstances rate-limit and
// account vCPU quota are the real backpressure here.
return 1 << 20
}
return n
}
// Child grid layout constants — kept in sync with canvas-topology.ts on
// the client. Children laid on import use the same 2-column grid so the
@ -600,8 +654,16 @@ func (h *OrgHandler) Import(c *gin.Context) {
results := []map[string]interface{}{}
var createErr error
// Semaphore limits concurrent Docker provisioning (#1084).
provisionSem := make(chan struct{}, provisionConcurrency)
// Semaphore limits concurrent provision goroutines (#1084).
// Cap is configurable via MOLECULE_PROVISION_CONCURRENCY:
// unset → 3 (Docker-mode default)
// "0" → effectively unlimited (SaaS / EC2 backend)
// N>0 → exactly N
// See resolveProvisionConcurrency for the full env-parse contract.
concurrency := resolveProvisionConcurrency()
provisionSem := make(chan struct{}, concurrency)
log.Printf("org_import: provision concurrency cap=%d (env MOLECULE_PROVISION_CONCURRENCY=%q)",
concurrency, os.Getenv("MOLECULE_PROVISION_CONCURRENCY"))
// Recursively create workspaces. Root workspaces keep their YAML
// canvas coords; children are positioned by createWorkspaceTree

View File

@ -0,0 +1,96 @@
package handlers
import (
"testing"
)
// Tests for resolveProvisionConcurrency — the env-parse contract that
// turns MOLECULE_PROVISION_CONCURRENCY into the channel-buffer size for
// the org-import provision semaphore.
//
// Why this matters: with the wrong cap, org-import either serializes
// (cap=1, slow) or stampedes the provider (cap=infinity on a backend
// that can't take it). The defaults — 3 for Docker, "0=unlimited" for
// EC2/SaaS — are what most operators want; the parse logic exists to
// route the env var to the right behavior without surprise.
//
// The "0 → unlimited" mapping is the user-facing piece worth pinning
// in tests: easy to misread as "0 means stop entirely" if someone
// re-reads the constant block years later.
func TestResolveProvisionConcurrency_UnsetUsesDefault(t *testing.T) {
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "")
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
t.Errorf("unset env: got %d, want %d", got, defaultProvisionConcurrency)
}
}
func TestResolveProvisionConcurrency_ZeroIsUnlimited(t *testing.T) {
// "0" is the user-facing shorthand for "no cap". The implementation
// returns a large but finite cap so the channel-based semaphore
// stays a no-op without infinite-buffer risk.
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "0")
got := resolveProvisionConcurrency()
if got <= defaultProvisionConcurrency {
t.Errorf("0 should map to large 'unlimited' cap, got %d", got)
}
// 1<<20 today; pin the lower bound rather than the exact value so
// future tuning of the magic number doesn't break this test.
if got < 1024 {
t.Errorf("0 should map to a cap >= 1024 (effectively unlimited), got %d", got)
}
}
func TestResolveProvisionConcurrency_PositiveIntegerExact(t *testing.T) {
cases := []struct {
env string
want int
}{
{"1", 1},
{"5", 5},
{"10", 10},
{"50", 50},
}
for _, tc := range cases {
t.Run(tc.env, func(t *testing.T) {
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", tc.env)
if got := resolveProvisionConcurrency(); got != tc.want {
t.Errorf("env=%q: got %d, want %d", tc.env, got, tc.want)
}
})
}
}
func TestResolveProvisionConcurrency_NegativeFallsBackToDefault(t *testing.T) {
// Negative values are operator misconfiguration. Fall back to the
// safe default rather than passing through to make(chan, -5) which
// panics. The handler logs a warning so the operator notices.
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", "-5")
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
t.Errorf("negative env: got %d, want default %d", got, defaultProvisionConcurrency)
}
}
func TestResolveProvisionConcurrency_NonNumericFallsBackToDefault(t *testing.T) {
// Garbage in env shouldn't crash org-import. Common in dev when an
// operator types `MOLECULE_PROVISION_CONCURRENCY=true` or similar.
cases := []string{"true", "yes", "infinity", "ten", "3.5", "0x10"}
for _, raw := range cases {
t.Run(raw, func(t *testing.T) {
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", raw)
if got := resolveProvisionConcurrency(); got != defaultProvisionConcurrency {
t.Errorf("non-numeric env=%q: got %d, want default %d",
raw, got, defaultProvisionConcurrency)
}
})
}
}
func TestResolveProvisionConcurrency_WhitespaceTrimmed(t *testing.T) {
// Operators frequently set env vars with stray whitespace from
// copy-paste. Trim before parse so " 7 " == "7".
t.Setenv("MOLECULE_PROVISION_CONCURRENCY", " 7 ")
if got := resolveProvisionConcurrency(); got != 7 {
t.Errorf("whitespace env: got %d, want 7", got)
}
}