forked from molecule-ai/molecule-core
feat(platform): auto-detect SaaS tenant → control plane provisioner
No env vars to configure. The platform auto-detects the backend: MOLECULE_ORG_ID set → SaaS tenant → control plane provisioner MOLECULE_ORG_ID empty → self-hosted → Docker provisioner The control plane URL defaults to https://api.moleculesai.app (override with CP_PROVISION_URL for testing). No FLY_API_TOKEN on the tenant. Removed: direct Fly provisioner (FlyProvisioner) — all SaaS workspace provisioning goes through the control plane which holds the Fly token and manages billing, quotas, and cleanup. Two backends: CPProvisioner (SaaS) and Docker Provisioner (self-hosted). Closes #494 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
08f5b2f0b3
commit
1ea615df4c
@ -100,20 +100,23 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
// Provisioner — select backend based on CONTAINER_BACKEND env var.
|
||||
// "flyio" → Fly Machines API (SaaS tenants). Anything else → Docker (default).
|
||||
// Provisioner — auto-detect backend:
|
||||
// 1. MOLECULE_ORG_ID set → SaaS tenant → control plane provisioner
|
||||
// 2. Docker available → self-hosted → Docker provisioner
|
||||
// 3. Neither → provisioner disabled (external agents only)
|
||||
var prov *provisioner.Provisioner
|
||||
var flyProv *provisioner.FlyProvisioner
|
||||
switch os.Getenv("CONTAINER_BACKEND") {
|
||||
case "flyio":
|
||||
if fp, err := provisioner.NewFlyProvisioner(); err != nil {
|
||||
log.Printf("Fly provisioner failed: %v", err)
|
||||
var cpProv *provisioner.CPProvisioner
|
||||
if os.Getenv("MOLECULE_ORG_ID") != "" {
|
||||
// SaaS tenant — provision via control plane (holds Fly token, manages billing)
|
||||
if cp, err := provisioner.NewCPProvisioner(); err != nil {
|
||||
log.Printf("Control plane provisioner unavailable: %v", err)
|
||||
} else {
|
||||
flyProv = fp
|
||||
defer flyProv.Close()
|
||||
log.Printf("Provisioner: Fly Machines (app=%s)", os.Getenv("FLY_WORKSPACE_APP"))
|
||||
cpProv = cp
|
||||
defer cpProv.Close()
|
||||
log.Println("Provisioner: Control Plane (auto-detected SaaS tenant)")
|
||||
}
|
||||
default:
|
||||
} else {
|
||||
// Self-hosted — use local Docker daemon
|
||||
if p, err := provisioner.New(); err != nil {
|
||||
log.Printf("Provisioner disabled (Docker not available): %v", err)
|
||||
} else {
|
||||
@ -131,8 +134,8 @@ func main() {
|
||||
// WorkspaceHandler is created before the router so RestartByID can be wired into
|
||||
// the offline callbacks used by both the liveness monitor and the health sweep.
|
||||
wh := handlers.NewWorkspaceHandler(broadcaster, prov, platformURL, configsDir)
|
||||
if flyProv != nil {
|
||||
wh.SetFlyProvisioner(flyProv)
|
||||
if cpProv != nil {
|
||||
wh.SetCPProvisioner(cpProv)
|
||||
}
|
||||
|
||||
// Offline handler: broadcast event + auto-restart the dead workspace
|
||||
|
||||
@ -18,6 +18,8 @@ CANVAS_PID=$!
|
||||
|
||||
# Start Go platform in foreground-ish (we trap signals)
|
||||
# CANVAS_PROXY_URL tells the platform to proxy unmatched routes to Canvas.
|
||||
# CONTAINER_BACKEND: empty = Docker (default for self-hosted/local).
|
||||
# Set to "flyio" via Fly machine env to use Fly Machines API instead.
|
||||
export CANVAS_PROXY_URL="${CANVAS_PROXY_URL:-http://localhost:3000}"
|
||||
cd /
|
||||
/platform &
|
||||
|
||||
@ -25,7 +25,7 @@ import (
|
||||
type WorkspaceHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
provisioner *provisioner.Provisioner
|
||||
flyProv *provisioner.FlyProvisioner
|
||||
cpProv *provisioner.CPProvisioner
|
||||
platformURL string
|
||||
configsDir string // path to workspace-configs-templates/ (for reading templates)
|
||||
// envMutators runs registered EnvMutator plugins right before
|
||||
@ -44,11 +44,10 @@ func NewWorkspaceHandler(b *events.Broadcaster, p *provisioner.Provisioner, plat
|
||||
}
|
||||
}
|
||||
|
||||
// SetFlyProvisioner wires the Fly Machines provisioner. When set,
|
||||
// workspace containers are provisioned as Fly Machines instead of
|
||||
// local Docker containers.
|
||||
func (h *WorkspaceHandler) SetFlyProvisioner(fp *provisioner.FlyProvisioner) {
|
||||
h.flyProv = fp
|
||||
// SetCPProvisioner wires the control plane provisioner for SaaS tenants.
|
||||
// Auto-activated when MOLECULE_ORG_ID is set (no manual config needed).
|
||||
func (h *WorkspaceHandler) SetCPProvisioner(cp *provisioner.CPProvisioner) {
|
||||
h.cpProv = cp
|
||||
}
|
||||
|
||||
// SetEnvMutators wires a provisionhook.Registry into the handler. Plugins
|
||||
@ -202,9 +201,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
configFiles = h.ensureDefaultConfig(id, payload)
|
||||
}
|
||||
|
||||
// Auto-provision — start a container (Docker) or Fly Machine
|
||||
if h.flyProv != nil {
|
||||
go h.provisionWorkspaceFly(id, templatePath, configFiles, payload)
|
||||
// Auto-provision — pick backend: control plane (SaaS) or Docker (self-hosted)
|
||||
if h.cpProv != nil {
|
||||
go h.provisionWorkspaceCP(id, templatePath, configFiles, payload)
|
||||
} else if h.provisioner != nil {
|
||||
go h.provisionWorkspace(id, templatePath, configFiles, payload)
|
||||
} else {
|
||||
|
||||
@ -459,13 +459,18 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
|
||||
|
||||
// provisionWorkspaceFly provisions a workspace as a Fly Machine instead
|
||||
// of a local Docker container. Same secret-loading + env-var logic as
|
||||
// provisionWorkspaceOpts, but calls FlyProvisioner.Start instead of
|
||||
// the Docker provisioner.
|
||||
func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
|
||||
// provisionWorkspaceFly is removed — use provisionWorkspaceCP instead.
|
||||
// Direct Fly provisioning from the tenant was replaced by the control
|
||||
// plane architecture (the CP holds the Fly token, manages billing/quotas).
|
||||
|
||||
// provisionWorkspaceCP provisions a workspace via the control plane API.
|
||||
// The control plane holds the Fly token and manages billing/quotas — the
|
||||
// tenant platform never talks to Fly directly.
|
||||
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Load secrets (same as Docker path)
|
||||
// Load secrets (same as Docker/Fly paths)
|
||||
envVars := map[string]string{}
|
||||
globalRows, globalErr := db.DB.QueryContext(ctx,
|
||||
`SELECT key, encrypted_value, encryption_version FROM global_secrets`)
|
||||
@ -478,7 +483,7 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin
|
||||
if globalRows.Scan(&k, &v, &ver) == nil {
|
||||
decrypted, decErr := crypto.DecryptVersioned(v, ver)
|
||||
if decErr != nil {
|
||||
log.Printf("FlyProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
|
||||
log.Printf("CPProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
|
||||
workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k))
|
||||
return
|
||||
@ -504,28 +509,23 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin
|
||||
|
||||
applyAgentGitIdentity(envVars, payload.Name)
|
||||
if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil {
|
||||
log.Printf("FlyProvisioner: env mutator failed for %s: %v", workspaceID, err)
|
||||
log.Printf("CPProvisioner: env mutator failed for %s: %v", workspaceID, err)
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
|
||||
workspaceID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
awarenessNamespace := h.loadAwarenessNamespace(ctx, workspaceID)
|
||||
|
||||
cfg := provisioner.WorkspaceConfig{
|
||||
WorkspaceID: workspaceID,
|
||||
TemplatePath: templatePath,
|
||||
ConfigFiles: configFiles,
|
||||
Tier: payload.Tier,
|
||||
Runtime: payload.Runtime,
|
||||
EnvVars: envVars,
|
||||
PlatformURL: h.platformURL,
|
||||
AwarenessNamespace: awarenessNamespace,
|
||||
WorkspaceID: workspaceID,
|
||||
Tier: payload.Tier,
|
||||
Runtime: payload.Runtime,
|
||||
EnvVars: envVars,
|
||||
PlatformURL: h.platformURL,
|
||||
}
|
||||
|
||||
machineID, err := h.flyProv.Start(ctx, cfg)
|
||||
machineID, err := h.cpProv.Start(ctx, cfg)
|
||||
if err != nil {
|
||||
log.Printf("FlyProvisioner: failed to start workspace %s: %v", workspaceID, err)
|
||||
log.Printf("CPProvisioner: failed to start workspace %s: %v", workspaceID, err)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
@ -534,14 +534,12 @@ func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath strin
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("FlyProvisioner: workspace %s started as Fly machine %s", workspaceID, machineID)
|
||||
// The workspace will register via POST /registry/register once the
|
||||
// agent boots inside the Fly machine, which transitions it to 'online'.
|
||||
// We issue a token preemptively so the agent can authenticate.
|
||||
log.Printf("CPProvisioner: workspace %s started as machine %s via control plane", workspaceID, machineID)
|
||||
// Issue token so the agent can authenticate on boot
|
||||
token, tokenErr := wsauth.IssueToken(ctx, db.DB, workspaceID)
|
||||
if tokenErr != nil {
|
||||
log.Printf("FlyProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr)
|
||||
log.Printf("CPProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr)
|
||||
} else {
|
||||
log.Printf("FlyProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8])
|
||||
log.Printf("CPProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8])
|
||||
}
|
||||
}
|
||||
|
||||
160
platform/internal/provisioner/cp_provisioner.go
Normal file
160
platform/internal/provisioner/cp_provisioner.go
Normal file
@ -0,0 +1,160 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CPProvisioner provisions workspace agents by calling the control plane's
|
||||
// workspace provision API. The control plane holds the Fly API token and
|
||||
// manages billing/quotas/cleanup. The tenant platform never talks to Fly
|
||||
// directly.
|
||||
//
|
||||
// Set CONTAINER_BACKEND=controlplane to activate. Requires CP_PROVISION_URL
|
||||
// (control plane base URL, e.g. "https://api.moleculesai.app").
|
||||
type CPProvisioner struct {
|
||||
baseURL string // e.g. "https://api.moleculesai.app"
|
||||
orgID string // MOLECULE_ORG_ID — identifies which org is provisioning
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewCPProvisioner creates a provisioner that delegates to the control plane.
|
||||
func NewCPProvisioner() (*CPProvisioner, error) {
|
||||
orgID := os.Getenv("MOLECULE_ORG_ID")
|
||||
if orgID == "" {
|
||||
return nil, fmt.Errorf("MOLECULE_ORG_ID required for controlplane provisioner")
|
||||
}
|
||||
|
||||
// Auto-derive control plane URL. Priority:
|
||||
// 1. Explicit CP_PROVISION_URL (override for testing)
|
||||
// 2. Explicit MOLECULE_CP_URL
|
||||
// 3. Default: https://api.moleculesai.app (production SaaS)
|
||||
baseURL := os.Getenv("CP_PROVISION_URL")
|
||||
if baseURL == "" {
|
||||
baseURL = os.Getenv("MOLECULE_CP_URL")
|
||||
}
|
||||
if baseURL == "" {
|
||||
baseURL = "https://api.moleculesai.app"
|
||||
}
|
||||
|
||||
return &CPProvisioner{
|
||||
baseURL: baseURL,
|
||||
orgID: orgID,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 60 * time.Second,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
type cpProvisionRequest struct {
|
||||
OrgID string `json:"org_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
Runtime string `json:"runtime"`
|
||||
Tier int `json:"tier"`
|
||||
PlatformURL string `json:"platform_url"`
|
||||
Env map[string]string `json:"env"`
|
||||
}
|
||||
|
||||
type cpProvisionResponse struct {
|
||||
MachineID string `json:"machine_id"`
|
||||
Name string `json:"name"`
|
||||
Region string `json:"region"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// Start provisions a workspace by calling the control plane.
|
||||
func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) {
|
||||
req := cpProvisionRequest{
|
||||
OrgID: p.orgID,
|
||||
WorkspaceID: cfg.WorkspaceID,
|
||||
Runtime: cfg.Runtime,
|
||||
Tier: cfg.Tier,
|
||||
PlatformURL: cfg.PlatformURL,
|
||||
Env: cfg.EnvVars,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cp provisioner: marshal: %w", err)
|
||||
}
|
||||
|
||||
url := p.baseURL + "/cp/workspaces/provision"
|
||||
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cp provisioner: create request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := p.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cp provisioner: send: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
var result cpProvisionResponse
|
||||
json.Unmarshal(respBody, &result)
|
||||
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
errMsg := result.Error
|
||||
if errMsg == "" {
|
||||
errMsg = string(respBody)
|
||||
}
|
||||
return "", fmt.Errorf("cp provisioner: provision failed (%d): %s", resp.StatusCode, errMsg)
|
||||
}
|
||||
|
||||
log.Printf("CP provisioner: workspace %s → machine %s in %s", cfg.WorkspaceID, result.MachineID, result.Region)
|
||||
return result.MachineID, nil
|
||||
}
|
||||
|
||||
// Stop destroys the workspace machine via the control plane.
|
||||
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s", p.baseURL, workspaceID)
|
||||
body, _ := json.Marshal(map[string]string{
|
||||
"org_id": p.orgID,
|
||||
"workspace_id": workspaceID,
|
||||
})
|
||||
|
||||
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := p.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cp provisioner: stop: %w", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("cp provisioner: stop failed (%d)", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning checks workspace machine status via the control plane.
|
||||
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s/status?machine_id=%s", p.baseURL, workspaceID, workspaceID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
|
||||
resp, err := p.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var result struct {
|
||||
State string `json:"state"`
|
||||
}
|
||||
json.NewDecoder(resp.Body).Decode(&result)
|
||||
return result.State == "started", nil
|
||||
}
|
||||
|
||||
// Close is a no-op.
|
||||
func (p *CPProvisioner) Close() error { return nil }
|
||||
Loading…
Reference in New Issue
Block a user