Merge pull request #503 from Molecule-AI/feat/controlplane-provisioner

feat(platform): control plane provisioner (CONTAINER_BACKEND=controlplane)
This commit is contained in:
Hongming Wang 2026-04-16 11:54:07 -07:00 committed by GitHub
commit b1e971e4ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 208 additions and 46 deletions

View File

@ -100,20 +100,23 @@ func main() {
}
}()
// Provisioner — select backend based on CONTAINER_BACKEND env var.
// "flyio" → Fly Machines API (SaaS tenants). Anything else → Docker (default).
// Provisioner — auto-detect backend:
// 1. MOLECULE_ORG_ID set → SaaS tenant → control plane provisioner
// 2. Docker available → self-hosted → Docker provisioner
// 3. Neither → provisioner disabled (external agents only)
var prov *provisioner.Provisioner
var flyProv *provisioner.FlyProvisioner
switch os.Getenv("CONTAINER_BACKEND") {
case "flyio":
if fp, err := provisioner.NewFlyProvisioner(); err != nil {
log.Printf("Fly provisioner failed: %v", err)
var cpProv *provisioner.CPProvisioner
if os.Getenv("MOLECULE_ORG_ID") != "" {
// SaaS tenant — provision via control plane (holds Fly token, manages billing)
if cp, err := provisioner.NewCPProvisioner(); err != nil {
log.Printf("Control plane provisioner unavailable: %v", err)
} else {
flyProv = fp
defer flyProv.Close()
log.Printf("Provisioner: Fly Machines (app=%s)", os.Getenv("FLY_WORKSPACE_APP"))
cpProv = cp
defer cpProv.Close()
log.Println("Provisioner: Control Plane (auto-detected SaaS tenant)")
}
default:
} else {
// Self-hosted — use local Docker daemon
if p, err := provisioner.New(); err != nil {
log.Printf("Provisioner disabled (Docker not available): %v", err)
} else {
@ -131,8 +134,8 @@ func main() {
// WorkspaceHandler is created before the router so RestartByID can be wired into
// the offline callbacks used by both the liveness monitor and the health sweep.
wh := handlers.NewWorkspaceHandler(broadcaster, prov, platformURL, configsDir)
if flyProv != nil {
wh.SetFlyProvisioner(flyProv)
if cpProv != nil {
wh.SetCPProvisioner(cpProv)
}
// Offline handler: broadcast event + auto-restart the dead workspace

View File

@ -18,6 +18,8 @@ CANVAS_PID=$!
# Start Go platform in foreground-ish (we trap signals)
# CANVAS_PROXY_URL tells the platform to proxy unmatched routes to Canvas.
# CONTAINER_BACKEND: empty = Docker (default for self-hosted/local).
# Set to "flyio" via Fly machine env to use Fly Machines API instead.
export CANVAS_PROXY_URL="${CANVAS_PROXY_URL:-http://localhost:3000}"
cd /
/platform &

View File

@ -25,7 +25,7 @@ import (
type WorkspaceHandler struct {
broadcaster *events.Broadcaster
provisioner *provisioner.Provisioner
flyProv *provisioner.FlyProvisioner
cpProv *provisioner.CPProvisioner
platformURL string
configsDir string // path to workspace-configs-templates/ (for reading templates)
// envMutators runs registered EnvMutator plugins right before
@ -44,11 +44,10 @@ func NewWorkspaceHandler(b *events.Broadcaster, p *provisioner.Provisioner, plat
}
}
// SetFlyProvisioner wires the Fly Machines provisioner. When set,
// workspace containers are provisioned as Fly Machines instead of
// local Docker containers.
func (h *WorkspaceHandler) SetFlyProvisioner(fp *provisioner.FlyProvisioner) {
h.flyProv = fp
// SetCPProvisioner wires the control plane provisioner for SaaS tenants.
// Auto-activated when MOLECULE_ORG_ID is set (no manual config needed).
func (h *WorkspaceHandler) SetCPProvisioner(cp *provisioner.CPProvisioner) {
h.cpProv = cp
}
// SetEnvMutators wires a provisionhook.Registry into the handler. Plugins
@ -202,9 +201,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
configFiles = h.ensureDefaultConfig(id, payload)
}
// Auto-provision — start a container (Docker) or Fly Machine
if h.flyProv != nil {
go h.provisionWorkspaceFly(id, templatePath, configFiles, payload)
// Auto-provision — pick backend: control plane (SaaS) or Docker (self-hosted)
if h.cpProv != nil {
go h.provisionWorkspaceCP(id, templatePath, configFiles, payload)
} else if h.provisioner != nil {
go h.provisionWorkspace(id, templatePath, configFiles, payload)
} else {

View File

@ -459,13 +459,18 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
// provisionWorkspaceFly provisions a workspace as a Fly Machine instead
// of a local Docker container. Same secret-loading + env-var logic as
// provisionWorkspaceOpts, but calls FlyProvisioner.Start instead of
// the Docker provisioner.
func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
// provisionWorkspaceFly is removed — use provisionWorkspaceCP instead.
// Direct Fly provisioning from the tenant was replaced by the control
// plane architecture (the CP holds the Fly token, manages billing/quotas).
// provisionWorkspaceCP provisions a workspace via the control plane API.
// The control plane holds the Fly token and manages billing/quotas — the
// tenant platform never talks to Fly directly.
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
// Load secrets (same as Docker path)
// Load secrets (same as Docker/Fly paths)
envVars := map[string]string{}
globalRows, globalErr := db.DB.QueryContext(ctx,
`SELECT key, encrypted_value, encryption_version FROM global_secrets`)
@ -478,7 +483,7 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin
if globalRows.Scan(&k, &v, &ver) == nil {
decrypted, decErr := crypto.DecryptVersioned(v, ver)
if decErr != nil {
log.Printf("FlyProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
log.Printf("CPProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k))
return
@ -504,28 +509,23 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin
applyAgentGitIdentity(envVars, payload.Name)
if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil {
log.Printf("FlyProvisioner: env mutator failed for %s: %v", workspaceID, err)
log.Printf("CPProvisioner: env mutator failed for %s: %v", workspaceID, err)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, err.Error())
return
}
awarenessNamespace := h.loadAwarenessNamespace(ctx, workspaceID)
cfg := provisioner.WorkspaceConfig{
WorkspaceID: workspaceID,
TemplatePath: templatePath,
ConfigFiles: configFiles,
Tier: payload.Tier,
Runtime: payload.Runtime,
EnvVars: envVars,
PlatformURL: h.platformURL,
AwarenessNamespace: awarenessNamespace,
WorkspaceID: workspaceID,
Tier: payload.Tier,
Runtime: payload.Runtime,
EnvVars: envVars,
PlatformURL: h.platformURL,
}
machineID, err := h.flyProv.Start(ctx, cfg)
machineID, err := h.cpProv.Start(ctx, cfg)
if err != nil {
log.Printf("FlyProvisioner: failed to start workspace %s: %v", workspaceID, err)
log.Printf("CPProvisioner: failed to start workspace %s: %v", workspaceID, err)
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, map[string]interface{}{
"error": err.Error(),
})
@ -534,14 +534,12 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin
return
}
log.Printf("FlyProvisioner: workspace %s started as Fly machine %s", workspaceID, machineID)
// The workspace will register via POST /registry/register once the
// agent boots inside the Fly machine, which transitions it to 'online'.
// We issue a token preemptively so the agent can authenticate.
log.Printf("CPProvisioner: workspace %s started as machine %s via control plane", workspaceID, machineID)
// Issue token so the agent can authenticate on boot
token, tokenErr := wsauth.IssueToken(ctx, db.DB, workspaceID)
if tokenErr != nil {
log.Printf("FlyProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr)
log.Printf("CPProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr)
} else {
log.Printf("FlyProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8])
log.Printf("CPProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8])
}
}

View File

@ -0,0 +1,160 @@
package provisioner
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
)
// CPProvisioner provisions workspace agents by calling the control plane's
// workspace provision API. The control plane holds the Fly API token and
// manages billing/quotas/cleanup. The tenant platform never talks to Fly
// directly.
//
// Set CONTAINER_BACKEND=controlplane to activate. Requires CP_PROVISION_URL
// (control plane base URL, e.g. "https://api.moleculesai.app").
type CPProvisioner struct {
baseURL string // e.g. "https://api.moleculesai.app"
orgID string // MOLECULE_ORG_ID — identifies which org is provisioning
httpClient *http.Client
}
// NewCPProvisioner creates a provisioner that delegates to the control plane.
func NewCPProvisioner() (*CPProvisioner, error) {
orgID := os.Getenv("MOLECULE_ORG_ID")
if orgID == "" {
return nil, fmt.Errorf("MOLECULE_ORG_ID required for controlplane provisioner")
}
// Auto-derive control plane URL. Priority:
// 1. Explicit CP_PROVISION_URL (override for testing)
// 2. Explicit MOLECULE_CP_URL
// 3. Default: https://api.moleculesai.app (production SaaS)
baseURL := os.Getenv("CP_PROVISION_URL")
if baseURL == "" {
baseURL = os.Getenv("MOLECULE_CP_URL")
}
if baseURL == "" {
baseURL = "https://api.moleculesai.app"
}
return &CPProvisioner{
baseURL: baseURL,
orgID: orgID,
httpClient: &http.Client{
Timeout: 60 * time.Second,
},
}, nil
}
type cpProvisionRequest struct {
OrgID string `json:"org_id"`
WorkspaceID string `json:"workspace_id"`
Runtime string `json:"runtime"`
Tier int `json:"tier"`
PlatformURL string `json:"platform_url"`
Env map[string]string `json:"env"`
}
type cpProvisionResponse struct {
MachineID string `json:"machine_id"`
Name string `json:"name"`
Region string `json:"region"`
Status string `json:"status"`
Error string `json:"error"`
}
// Start provisions a workspace by calling the control plane.
func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) {
req := cpProvisionRequest{
OrgID: p.orgID,
WorkspaceID: cfg.WorkspaceID,
Runtime: cfg.Runtime,
Tier: cfg.Tier,
PlatformURL: cfg.PlatformURL,
Env: cfg.EnvVars,
}
body, err := json.Marshal(req)
if err != nil {
return "", fmt.Errorf("cp provisioner: marshal: %w", err)
}
url := p.baseURL + "/cp/workspaces/provision"
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("cp provisioner: create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(httpReq)
if err != nil {
return "", fmt.Errorf("cp provisioner: send: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
var result cpProvisionResponse
json.Unmarshal(respBody, &result)
if resp.StatusCode != http.StatusCreated {
errMsg := result.Error
if errMsg == "" {
errMsg = string(respBody)
}
return "", fmt.Errorf("cp provisioner: provision failed (%d): %s", resp.StatusCode, errMsg)
}
log.Printf("CP provisioner: workspace %s → machine %s in %s", cfg.WorkspaceID, result.MachineID, result.Region)
return result.MachineID, nil
}
// Stop destroys the workspace machine via the control plane.
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
url := fmt.Sprintf("%s/cp/workspaces/%s", p.baseURL, workspaceID)
body, _ := json.Marshal(map[string]string{
"org_id": p.orgID,
"workspace_id": workspaceID,
})
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return fmt.Errorf("cp provisioner: stop: %w", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("cp provisioner: stop failed (%d)", resp.StatusCode)
}
return nil
}
// IsRunning checks workspace machine status via the control plane.
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
url := fmt.Sprintf("%s/cp/workspaces/%s/status?machine_id=%s", p.baseURL, workspaceID, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := p.httpClient.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
var result struct {
State string `json:"state"`
}
json.NewDecoder(resp.Body).Decode(&result)
return result.State == "started", nil
}
// Close is a no-op.
func (p *CPProvisioner) Close() error { return nil }