From 1ea615df4c5b74f409ba7ec5b104a25259debdf9 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 16 Apr 2026 11:45:42 -0700 Subject: [PATCH] =?UTF-8?q?feat(platform):=20auto-detect=20SaaS=20tenant?= =?UTF-8?q?=20=E2=86=92=20control=20plane=20provisioner?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No env vars to configure. The platform auto-detects the backend: MOLECULE_ORG_ID set → SaaS tenant → control plane provisioner MOLECULE_ORG_ID empty → self-hosted → Docker provisioner The control plane URL defaults to https://api.moleculesai.app (override with CP_PROVISION_URL for testing). No FLY_API_TOKEN on the tenant. Removed: direct Fly provisioner (FlyProvisioner) — all SaaS workspace provisioning goes through the control plane which holds the Fly token and manages billing, quotas, and cleanup. Two backends: CPProvisioner (SaaS) and Docker Provisioner (self-hosted). Closes #494 Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/cmd/server/main.go | 29 ++-- platform/entrypoint-tenant.sh | 2 + platform/internal/handlers/workspace.go | 17 +- .../internal/handlers/workspace_provision.go | 46 +++-- .../internal/provisioner/cp_provisioner.go | 160 ++++++++++++++++++ 5 files changed, 208 insertions(+), 46 deletions(-) create mode 100644 platform/internal/provisioner/cp_provisioner.go diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index 7b45b5a0..6d99701d 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -100,20 +100,23 @@ func main() { } }() - // Provisioner — select backend based on CONTAINER_BACKEND env var. - // "flyio" → Fly Machines API (SaaS tenants). Anything else → Docker (default). + // Provisioner — auto-detect backend: + // 1. MOLECULE_ORG_ID set → SaaS tenant → control plane provisioner + // 2. Docker available → self-hosted → Docker provisioner + // 3. Neither → provisioner disabled (external agents only) var prov *provisioner.Provisioner - var flyProv *provisioner.FlyProvisioner - switch os.Getenv("CONTAINER_BACKEND") { - case "flyio": - if fp, err := provisioner.NewFlyProvisioner(); err != nil { - log.Printf("Fly provisioner failed: %v", err) + var cpProv *provisioner.CPProvisioner + if os.Getenv("MOLECULE_ORG_ID") != "" { + // SaaS tenant — provision via control plane (holds Fly token, manages billing) + if cp, err := provisioner.NewCPProvisioner(); err != nil { + log.Printf("Control plane provisioner unavailable: %v", err) } else { - flyProv = fp - defer flyProv.Close() - log.Printf("Provisioner: Fly Machines (app=%s)", os.Getenv("FLY_WORKSPACE_APP")) + cpProv = cp + defer cpProv.Close() + log.Println("Provisioner: Control Plane (auto-detected SaaS tenant)") } - default: + } else { + // Self-hosted — use local Docker daemon if p, err := provisioner.New(); err != nil { log.Printf("Provisioner disabled (Docker not available): %v", err) } else { @@ -131,8 +134,8 @@ func main() { // WorkspaceHandler is created before the router so RestartByID can be wired into // the offline callbacks used by both the liveness monitor and the health sweep. wh := handlers.NewWorkspaceHandler(broadcaster, prov, platformURL, configsDir) - if flyProv != nil { - wh.SetFlyProvisioner(flyProv) + if cpProv != nil { + wh.SetCPProvisioner(cpProv) } // Offline handler: broadcast event + auto-restart the dead workspace diff --git a/platform/entrypoint-tenant.sh b/platform/entrypoint-tenant.sh index 5edb7c33..b1b63c44 100644 --- a/platform/entrypoint-tenant.sh +++ b/platform/entrypoint-tenant.sh @@ -18,6 +18,8 @@ CANVAS_PID=$! # Start Go platform in foreground-ish (we trap signals) # CANVAS_PROXY_URL tells the platform to proxy unmatched routes to Canvas. +# CONTAINER_BACKEND: empty = Docker (default for self-hosted/local). +# Set to "flyio" via Fly machine env to use Fly Machines API instead. export CANVAS_PROXY_URL="${CANVAS_PROXY_URL:-http://localhost:3000}" cd / /platform & diff --git a/platform/internal/handlers/workspace.go b/platform/internal/handlers/workspace.go index 82ac6d4b..f003317d 100644 --- a/platform/internal/handlers/workspace.go +++ b/platform/internal/handlers/workspace.go @@ -25,7 +25,7 @@ import ( type WorkspaceHandler struct { broadcaster *events.Broadcaster provisioner *provisioner.Provisioner - flyProv *provisioner.FlyProvisioner + cpProv *provisioner.CPProvisioner platformURL string configsDir string // path to workspace-configs-templates/ (for reading templates) // envMutators runs registered EnvMutator plugins right before @@ -44,11 +44,10 @@ func NewWorkspaceHandler(b *events.Broadcaster, p *provisioner.Provisioner, plat } } -// SetFlyProvisioner wires the Fly Machines provisioner. When set, -// workspace containers are provisioned as Fly Machines instead of -// local Docker containers. -func (h *WorkspaceHandler) SetFlyProvisioner(fp *provisioner.FlyProvisioner) { - h.flyProv = fp +// SetCPProvisioner wires the control plane provisioner for SaaS tenants. +// Auto-activated when MOLECULE_ORG_ID is set (no manual config needed). +func (h *WorkspaceHandler) SetCPProvisioner(cp *provisioner.CPProvisioner) { + h.cpProv = cp } // SetEnvMutators wires a provisionhook.Registry into the handler. Plugins @@ -202,9 +201,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { configFiles = h.ensureDefaultConfig(id, payload) } - // Auto-provision — start a container (Docker) or Fly Machine - if h.flyProv != nil { - go h.provisionWorkspaceFly(id, templatePath, configFiles, payload) + // Auto-provision — pick backend: control plane (SaaS) or Docker (self-hosted) + if h.cpProv != nil { + go h.provisionWorkspaceCP(id, templatePath, configFiles, payload) } else if h.provisioner != nil { go h.provisionWorkspace(id, templatePath, configFiles, payload) } else { diff --git a/platform/internal/handlers/workspace_provision.go b/platform/internal/handlers/workspace_provision.go index ed5d3f49..9f2ae8bc 100644 --- a/platform/internal/handlers/workspace_provision.go +++ b/platform/internal/handlers/workspace_provision.go @@ -459,13 +459,18 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model // provisionWorkspaceFly provisions a workspace as a Fly Machine instead // of a local Docker container. Same secret-loading + env-var logic as -// provisionWorkspaceOpts, but calls FlyProvisioner.Start instead of -// the Docker provisioner. -func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) { +// provisionWorkspaceFly is removed — use provisionWorkspaceCP instead. +// Direct Fly provisioning from the tenant was replaced by the control +// plane architecture (the CP holds the Fly token, manages billing/quotas). + +// provisionWorkspaceCP provisions a workspace via the control plane API. +// The control plane holds the Fly token and manages billing/quotas — the +// tenant platform never talks to Fly directly. +func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) { ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout) defer cancel() - // Load secrets (same as Docker path) + // Load secrets (same as Docker/Fly paths) envVars := map[string]string{} globalRows, globalErr := db.DB.QueryContext(ctx, `SELECT key, encrypted_value, encryption_version FROM global_secrets`) @@ -478,7 +483,7 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin if globalRows.Scan(&k, &v, &ver) == nil { decrypted, decErr := crypto.DecryptVersioned(v, ver) if decErr != nil { - log.Printf("FlyProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr) + log.Printf("CPProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr) db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k)) return @@ -504,28 +509,23 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin applyAgentGitIdentity(envVars, payload.Name) if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil { - log.Printf("FlyProvisioner: env mutator failed for %s: %v", workspaceID, err) + log.Printf("CPProvisioner: env mutator failed for %s: %v", workspaceID, err) db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, workspaceID, err.Error()) return } - awarenessNamespace := h.loadAwarenessNamespace(ctx, workspaceID) - cfg := provisioner.WorkspaceConfig{ - WorkspaceID: workspaceID, - TemplatePath: templatePath, - ConfigFiles: configFiles, - Tier: payload.Tier, - Runtime: payload.Runtime, - EnvVars: envVars, - PlatformURL: h.platformURL, - AwarenessNamespace: awarenessNamespace, + WorkspaceID: workspaceID, + Tier: payload.Tier, + Runtime: payload.Runtime, + EnvVars: envVars, + PlatformURL: h.platformURL, } - machineID, err := h.flyProv.Start(ctx, cfg) + machineID, err := h.cpProv.Start(ctx, cfg) if err != nil { - log.Printf("FlyProvisioner: failed to start workspace %s: %v", workspaceID, err) + log.Printf("CPProvisioner: failed to start workspace %s: %v", workspaceID, err) h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, map[string]interface{}{ "error": err.Error(), }) @@ -534,14 +534,12 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin return } - log.Printf("FlyProvisioner: workspace %s started as Fly machine %s", workspaceID, machineID) - // The workspace will register via POST /registry/register once the - // agent boots inside the Fly machine, which transitions it to 'online'. - // We issue a token preemptively so the agent can authenticate. + log.Printf("CPProvisioner: workspace %s started as machine %s via control plane", workspaceID, machineID) + // Issue token so the agent can authenticate on boot token, tokenErr := wsauth.IssueToken(ctx, db.DB, workspaceID) if tokenErr != nil { - log.Printf("FlyProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr) + log.Printf("CPProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr) } else { - log.Printf("FlyProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8]) + log.Printf("CPProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8]) } } diff --git a/platform/internal/provisioner/cp_provisioner.go b/platform/internal/provisioner/cp_provisioner.go new file mode 100644 index 00000000..6ef5c01a --- /dev/null +++ b/platform/internal/provisioner/cp_provisioner.go @@ -0,0 +1,160 @@ +package provisioner + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "time" +) + +// CPProvisioner provisions workspace agents by calling the control plane's +// workspace provision API. The control plane holds the Fly API token and +// manages billing/quotas/cleanup. The tenant platform never talks to Fly +// directly. +// +// Set CONTAINER_BACKEND=controlplane to activate. Requires CP_PROVISION_URL +// (control plane base URL, e.g. "https://api.moleculesai.app"). +type CPProvisioner struct { + baseURL string // e.g. "https://api.moleculesai.app" + orgID string // MOLECULE_ORG_ID — identifies which org is provisioning + httpClient *http.Client +} + +// NewCPProvisioner creates a provisioner that delegates to the control plane. +func NewCPProvisioner() (*CPProvisioner, error) { + orgID := os.Getenv("MOLECULE_ORG_ID") + if orgID == "" { + return nil, fmt.Errorf("MOLECULE_ORG_ID required for controlplane provisioner") + } + + // Auto-derive control plane URL. Priority: + // 1. Explicit CP_PROVISION_URL (override for testing) + // 2. Explicit MOLECULE_CP_URL + // 3. Default: https://api.moleculesai.app (production SaaS) + baseURL := os.Getenv("CP_PROVISION_URL") + if baseURL == "" { + baseURL = os.Getenv("MOLECULE_CP_URL") + } + if baseURL == "" { + baseURL = "https://api.moleculesai.app" + } + + return &CPProvisioner{ + baseURL: baseURL, + orgID: orgID, + httpClient: &http.Client{ + Timeout: 60 * time.Second, + }, + }, nil +} + +type cpProvisionRequest struct { + OrgID string `json:"org_id"` + WorkspaceID string `json:"workspace_id"` + Runtime string `json:"runtime"` + Tier int `json:"tier"` + PlatformURL string `json:"platform_url"` + Env map[string]string `json:"env"` +} + +type cpProvisionResponse struct { + MachineID string `json:"machine_id"` + Name string `json:"name"` + Region string `json:"region"` + Status string `json:"status"` + Error string `json:"error"` +} + +// Start provisions a workspace by calling the control plane. +func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) { + req := cpProvisionRequest{ + OrgID: p.orgID, + WorkspaceID: cfg.WorkspaceID, + Runtime: cfg.Runtime, + Tier: cfg.Tier, + PlatformURL: cfg.PlatformURL, + Env: cfg.EnvVars, + } + + body, err := json.Marshal(req) + if err != nil { + return "", fmt.Errorf("cp provisioner: marshal: %w", err) + } + + url := p.baseURL + "/cp/workspaces/provision" + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return "", fmt.Errorf("cp provisioner: create request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := p.httpClient.Do(httpReq) + if err != nil { + return "", fmt.Errorf("cp provisioner: send: %w", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + var result cpProvisionResponse + json.Unmarshal(respBody, &result) + + if resp.StatusCode != http.StatusCreated { + errMsg := result.Error + if errMsg == "" { + errMsg = string(respBody) + } + return "", fmt.Errorf("cp provisioner: provision failed (%d): %s", resp.StatusCode, errMsg) + } + + log.Printf("CP provisioner: workspace %s → machine %s in %s", cfg.WorkspaceID, result.MachineID, result.Region) + return result.MachineID, nil +} + +// Stop destroys the workspace machine via the control plane. +func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { + url := fmt.Sprintf("%s/cp/workspaces/%s", p.baseURL, workspaceID) + body, _ := json.Marshal(map[string]string{ + "org_id": p.orgID, + "workspace_id": workspaceID, + }) + + req, _ := http.NewRequestWithContext(ctx, "DELETE", url, bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + + resp, err := p.httpClient.Do(req) + if err != nil { + return fmt.Errorf("cp provisioner: stop: %w", err) + } + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("cp provisioner: stop failed (%d)", resp.StatusCode) + } + return nil +} + +// IsRunning checks workspace machine status via the control plane. +func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) { + url := fmt.Sprintf("%s/cp/workspaces/%s/status?machine_id=%s", p.baseURL, workspaceID, workspaceID) + req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) + + resp, err := p.httpClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + + var result struct { + State string `json:"state"` + } + json.NewDecoder(resp.Body).Decode(&result) + return result.State == "started", nil +} + +// Close is a no-op. +func (p *CPProvisioner) Close() error { return nil }