diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index 12dcc710..7b45b5a0 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -100,13 +100,27 @@ func main() { } }() - // Provisioner (optional — gracefully degrades if Docker not available) + // Provisioner — select backend based on CONTAINER_BACKEND env var. + // "flyio" → Fly Machines API (SaaS tenants). Anything else → Docker (default). var prov *provisioner.Provisioner - if p, err := provisioner.New(); err != nil { - log.Printf("Provisioner disabled (Docker not available): %v", err) - } else { - prov = p - defer prov.Close() + var flyProv *provisioner.FlyProvisioner + switch os.Getenv("CONTAINER_BACKEND") { + case "flyio": + if fp, err := provisioner.NewFlyProvisioner(); err != nil { + log.Printf("Fly provisioner failed: %v", err) + } else { + flyProv = fp + defer flyProv.Close() + log.Printf("Provisioner: Fly Machines (app=%s)", os.Getenv("FLY_WORKSPACE_APP")) + } + default: + if p, err := provisioner.New(); err != nil { + log.Printf("Provisioner disabled (Docker not available): %v", err) + } else { + prov = p + defer prov.Close() + log.Println("Provisioner: Docker") + } } port := envOr("PORT", "8080") @@ -117,6 +131,9 @@ func main() { // WorkspaceHandler is created before the router so RestartByID can be wired into // the offline callbacks used by both the liveness monitor and the health sweep. wh := handlers.NewWorkspaceHandler(broadcaster, prov, platformURL, configsDir) + if flyProv != nil { + wh.SetFlyProvisioner(flyProv) + } // Offline handler: broadcast event + auto-restart the dead workspace onWorkspaceOffline := func(innerCtx context.Context, workspaceID string) { diff --git a/platform/internal/handlers/workspace.go b/platform/internal/handlers/workspace.go index 1cb79886..580cc0d3 100644 --- a/platform/internal/handlers/workspace.go +++ b/platform/internal/handlers/workspace.go @@ -24,6 +24,7 @@ import ( type WorkspaceHandler struct { broadcaster *events.Broadcaster provisioner *provisioner.Provisioner + flyProv *provisioner.FlyProvisioner platformURL string configsDir string // path to workspace-configs-templates/ (for reading templates) // envMutators runs registered EnvMutator plugins right before @@ -42,6 +43,13 @@ func NewWorkspaceHandler(b *events.Broadcaster, p *provisioner.Provisioner, plat } } +// SetFlyProvisioner wires the Fly Machines provisioner. When set, +// workspace containers are provisioned as Fly Machines instead of +// local Docker containers. +func (h *WorkspaceHandler) SetFlyProvisioner(fp *provisioner.FlyProvisioner) { + h.flyProv = fp +} + // SetEnvMutators wires a provisionhook.Registry into the handler. Plugins // living in separate repos register on the same Registry instance during // boot (see cmd/server/main.go) and main.go calls this setter once before @@ -193,8 +201,10 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { configFiles = h.ensureDefaultConfig(id, payload) } - // Auto-provision — start a container - if h.provisioner != nil { + // Auto-provision — start a container (Docker) or Fly Machine + if h.flyProv != nil { + go h.provisionWorkspaceFly(id, templatePath, configFiles, payload) + } else if h.provisioner != nil { go h.provisionWorkspace(id, templatePath, configFiles, payload) } else { // No Docker available (SaaS tenant). Persist basic config as JSON diff --git a/platform/internal/handlers/workspace_provision.go b/platform/internal/handlers/workspace_provision.go index 73fc3fe5..ed5d3f49 100644 --- a/platform/internal/handlers/workspace_provision.go +++ b/platform/internal/handlers/workspace_provision.go @@ -456,3 +456,92 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model log.Printf("Provisioner: generated %d config files for workspace %s (runtime: %s)", len(files), workspaceID, runtime) return files } + +// provisionWorkspaceFly provisions a workspace as a Fly Machine instead +// of a local Docker container. Same secret-loading + env-var logic as +// provisionWorkspaceOpts, but calls FlyProvisioner.Start instead of +// the Docker provisioner. +func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) { + ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout) + defer cancel() + + // Load secrets (same as Docker path) + envVars := map[string]string{} + globalRows, globalErr := db.DB.QueryContext(ctx, + `SELECT key, encrypted_value, encryption_version FROM global_secrets`) + if globalErr == nil { + defer globalRows.Close() + for globalRows.Next() { + var k string + var v []byte + var ver int + if globalRows.Scan(&k, &v, &ver) == nil { + decrypted, decErr := crypto.DecryptVersioned(v, ver) + if decErr != nil { + log.Printf("FlyProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr) + db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, + workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k)) + return + } + envVars[k] = string(decrypted) + } + } + } + wsRows, err := db.DB.QueryContext(ctx, + `SELECT key, encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1`, workspaceID) + if err == nil { + defer wsRows.Close() + for wsRows.Next() { + var k string + var v []byte + var ver int + if wsRows.Scan(&k, &v, &ver) == nil { + decrypted, _ := crypto.DecryptVersioned(v, ver) + envVars[k] = string(decrypted) + } + } + } + + applyAgentGitIdentity(envVars, payload.Name) + if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil { + log.Printf("FlyProvisioner: env mutator failed for %s: %v", workspaceID, err) + db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, + workspaceID, err.Error()) + return + } + + awarenessNamespace := h.loadAwarenessNamespace(ctx, workspaceID) + + cfg := provisioner.WorkspaceConfig{ + WorkspaceID: workspaceID, + TemplatePath: templatePath, + ConfigFiles: configFiles, + Tier: payload.Tier, + Runtime: payload.Runtime, + EnvVars: envVars, + PlatformURL: h.platformURL, + AwarenessNamespace: awarenessNamespace, + } + + machineID, err := h.flyProv.Start(ctx, cfg) + if err != nil { + log.Printf("FlyProvisioner: failed to start workspace %s: %v", workspaceID, err) + h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, map[string]interface{}{ + "error": err.Error(), + }) + db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, + workspaceID, err.Error()) + return + } + + log.Printf("FlyProvisioner: workspace %s started as Fly machine %s", workspaceID, machineID) + // The workspace will register via POST /registry/register once the + // agent boots inside the Fly machine, which transitions it to 'online'. + // We issue a token preemptively so the agent can authenticate. + token, tokenErr := wsauth.IssueToken(ctx, db.DB, workspaceID) + if tokenErr != nil { + log.Printf("FlyProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr) + } else { + log.Printf("FlyProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8]) + } +} diff --git a/platform/internal/provisioner/fly_provisioner.go b/platform/internal/provisioner/fly_provisioner.go new file mode 100644 index 00000000..e6868c04 --- /dev/null +++ b/platform/internal/provisioner/fly_provisioner.go @@ -0,0 +1,292 @@ +package provisioner + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "time" +) + +// FlyRuntimeImages maps runtime names to their GHCR image tags for Fly. +// These are the same images as RuntimeImages but use the full registry path +// since Fly machines pull from a registry, not a local Docker daemon. +var FlyRuntimeImages = map[string]string{ + "langgraph": "ghcr.io/molecule-ai/workspace-langgraph:latest", + "claude-code": "ghcr.io/molecule-ai/workspace-claude-code:latest", + "openclaw": "ghcr.io/molecule-ai/workspace-openclaw:latest", + "deepagents": "ghcr.io/molecule-ai/workspace-deepagents:latest", + "crewai": "ghcr.io/molecule-ai/workspace-crewai:latest", + "autogen": "ghcr.io/molecule-ai/workspace-autogen:latest", + "hermes": "ghcr.io/molecule-ai/workspace-hermes:latest", + "gemini-cli": "ghcr.io/molecule-ai/workspace-gemini-cli:latest", +} + +const ( + flyAPIBase = "https://api.machines.dev/v1" + flyDefaultSize = "shared-cpu-1x" +) + +// FlyProvisioner provisions workspace agents as Fly Machines instead of +// local Docker containers. Used on SaaS tenants where no Docker daemon +// is available. Set CONTAINER_BACKEND=flyio to activate. +type FlyProvisioner struct { + token string // FLY_API_TOKEN + appID string // Fly app to create machines in (FLY_APP) + region string // Fly region (FLY_REGION, default "ord") +} + +// NewFlyProvisioner creates a provisioner that manages workspaces as Fly Machines. +func NewFlyProvisioner() (*FlyProvisioner, error) { + token := os.Getenv("FLY_API_TOKEN") + if token == "" { + return nil, fmt.Errorf("FLY_API_TOKEN required for Fly provisioner") + } + appID := os.Getenv("FLY_WORKSPACE_APP") + if appID == "" { + return nil, fmt.Errorf("FLY_WORKSPACE_APP required (Fly app for workspace machines)") + } + region := os.Getenv("FLY_REGION") + if region == "" { + region = "ord" + } + return &FlyProvisioner{token: token, appID: appID, region: region}, nil +} + +// flyMachineRequest is the payload for POST /apps/:app/machines. +type flyMachineRequest struct { + Name string `json:"name"` + Region string `json:"region"` + Config flyMachineConfig `json:"config"` +} + +type flyMachineConfig struct { + Image string `json:"image"` + Env map[string]string `json:"env"` + Services []flyService `json:"services,omitempty"` + Guest *flyGuest `json:"guest,omitempty"` +} + +type flyService struct { + Ports []flyPort `json:"ports"` + Protocol string `json:"protocol"` + InternalPort int `json:"internal_port"` +} + +type flyPort struct { + Port int `json:"port"` + Handlers []string `json:"handlers"` +} + +type flyGuest struct { + CPUKind string `json:"cpu_kind"` + CPUs int `json:"cpus"` + MemoryMB int `json:"memory_mb"` +} + +type flyMachineResponse struct { + ID string `json:"id"` + Name string `json:"name"` + State string `json:"state"` + InstanceID string `json:"instance_id"` + PrivateIP string `json:"private_ip"` +} + +// Start creates and starts a Fly Machine for the workspace. +func (p *FlyProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) { + image := FlyRuntimeImages[cfg.Runtime] + if image == "" { + image = FlyRuntimeImages["langgraph"] + } + + name := ContainerName(cfg.WorkspaceID) + + env := map[string]string{ + "WORKSPACE_ID": cfg.WorkspaceID, + "PLATFORM_URL": cfg.PlatformURL, + "PORT": DefaultPort, + } + if cfg.AwarenessURL != "" { + env["AWARENESS_URL"] = cfg.AwarenessURL + } + if cfg.AwarenessNamespace != "" { + env["AWARENESS_NAMESPACE"] = cfg.AwarenessNamespace + } + // Merge additional env vars (API keys, secrets) + for k, v := range cfg.EnvVars { + env[k] = v + } + + memMB := 512 + cpus := 1 + switch cfg.Tier { + case 3: + memMB = 2048 + cpus = 2 + case 4: + memMB = 4096 + cpus = 4 + } + + req := flyMachineRequest{ + Name: name, + Region: p.region, + Config: flyMachineConfig{ + Image: image, + Env: env, + Services: []flyService{ + { + InternalPort: 8000, + Protocol: "tcp", + Ports: []flyPort{ + {Port: 443, Handlers: []string{"tls", "http"}}, + }, + }, + }, + Guest: &flyGuest{ + CPUKind: "shared", + CPUs: cpus, + MemoryMB: memMB, + }, + }, + } + + body, err := json.Marshal(req) + if err != nil { + return "", fmt.Errorf("fly: marshal request: %w", err) + } + + url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID) + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return "", fmt.Errorf("fly: create request: %w", err) + } + httpReq.Header.Set("Authorization", "Bearer "+p.token) + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return "", fmt.Errorf("fly: send request: %w", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return "", fmt.Errorf("fly: create machine failed (%d): %s", resp.StatusCode, string(respBody)) + } + + var machine flyMachineResponse + if err := json.Unmarshal(respBody, &machine); err != nil { + return "", fmt.Errorf("fly: parse response: %w", err) + } + + log.Printf("Fly provisioner: created machine %s (%s) for workspace %s in %s", + machine.ID, machine.Name, cfg.WorkspaceID, p.region) + + return machine.ID, nil +} + +// Stop destroys the Fly Machine for a workspace. +func (p *FlyProvisioner) Stop(ctx context.Context, workspaceID string) error { + machineID, err := p.findMachine(ctx, workspaceID) + if err != nil { + return err + } + if machineID == "" { + return nil // already gone + } + + url := fmt.Sprintf("%s/apps/%s/machines/%s?force=true", flyAPIBase, p.appID, machineID) + req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil) + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("fly: delete machine: %w", err) + } + resp.Body.Close() + + log.Printf("Fly provisioner: deleted machine %s for workspace %s", machineID, workspaceID) + return nil +} + +// IsRunning checks if the workspace's Fly Machine is in "started" state. +func (p *FlyProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) { + machineID, err := p.findMachine(ctx, workspaceID) + if err != nil { + return false, err + } + if machineID == "" { + return false, nil + } + + url := fmt.Sprintf("%s/apps/%s/machines/%s", flyAPIBase, p.appID, machineID) + req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + + var machine flyMachineResponse + json.NewDecoder(resp.Body).Decode(&machine) + return machine.State == "started", nil +} + +// Restart stops and re-creates the machine. +func (p *FlyProvisioner) Restart(ctx context.Context, workspaceID string, cfg WorkspaceConfig) error { + machineID, err := p.findMachine(ctx, workspaceID) + if err != nil { + return err + } + if machineID != "" { + // Restart existing machine + url := fmt.Sprintf("%s/apps/%s/machines/%s/restart", flyAPIBase, p.appID, machineID) + req, _ := http.NewRequestWithContext(ctx, "POST", url, nil) + req.Header.Set("Authorization", "Bearer "+p.token) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("fly: restart machine: %w", err) + } + resp.Body.Close() + log.Printf("Fly provisioner: restarted machine %s for workspace %s", machineID, workspaceID) + return nil + } + // Machine doesn't exist — create it + _, err = p.Start(ctx, cfg) + return err +} + +// findMachine looks up the Fly Machine for a workspace by name. +func (p *FlyProvisioner) findMachine(ctx context.Context, workspaceID string) (string, error) { + name := ContainerName(workspaceID) + url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID) + req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) + req.Header.Set("Authorization", "Bearer "+p.token) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("fly: list machines: %w", err) + } + defer resp.Body.Close() + + var machines []flyMachineResponse + json.NewDecoder(resp.Body).Decode(&machines) + + for _, m := range machines { + if m.Name == name { + return m.ID, nil + } + } + return "", nil +} + +// Close is a no-op for the Fly provisioner (no persistent connections). +func (p *FlyProvisioner) Close() error { return nil }