Merge pull request #501 from Molecule-AI/feat/fly-provisioner

feat(platform): Fly Machines provisioner (CONTAINER_BACKEND=flyio)
This commit is contained in:
Hongming Wang 2026-04-16 11:05:52 -07:00 committed by GitHub
commit bb25d54daa
4 changed files with 416 additions and 8 deletions

View File

@ -100,13 +100,27 @@ func main() {
}
}()
// Provisioner (optional — gracefully degrades if Docker not available)
// Provisioner — select backend based on CONTAINER_BACKEND env var.
// "flyio" → Fly Machines API (SaaS tenants). Anything else → Docker (default).
var prov *provisioner.Provisioner
if p, err := provisioner.New(); err != nil {
log.Printf("Provisioner disabled (Docker not available): %v", err)
} else {
prov = p
defer prov.Close()
var flyProv *provisioner.FlyProvisioner
switch os.Getenv("CONTAINER_BACKEND") {
case "flyio":
if fp, err := provisioner.NewFlyProvisioner(); err != nil {
log.Printf("Fly provisioner failed: %v", err)
} else {
flyProv = fp
defer flyProv.Close()
log.Printf("Provisioner: Fly Machines (app=%s)", os.Getenv("FLY_WORKSPACE_APP"))
}
default:
if p, err := provisioner.New(); err != nil {
log.Printf("Provisioner disabled (Docker not available): %v", err)
} else {
prov = p
defer prov.Close()
log.Println("Provisioner: Docker")
}
}
port := envOr("PORT", "8080")
@ -117,6 +131,9 @@ func main() {
// WorkspaceHandler is created before the router so RestartByID can be wired into
// the offline callbacks used by both the liveness monitor and the health sweep.
wh := handlers.NewWorkspaceHandler(broadcaster, prov, platformURL, configsDir)
if flyProv != nil {
wh.SetFlyProvisioner(flyProv)
}
// Offline handler: broadcast event + auto-restart the dead workspace
onWorkspaceOffline := func(innerCtx context.Context, workspaceID string) {

View File

@ -24,6 +24,7 @@ import (
type WorkspaceHandler struct {
broadcaster *events.Broadcaster
provisioner *provisioner.Provisioner
flyProv *provisioner.FlyProvisioner
platformURL string
configsDir string // path to workspace-configs-templates/ (for reading templates)
// envMutators runs registered EnvMutator plugins right before
@ -42,6 +43,13 @@ func NewWorkspaceHandler(b *events.Broadcaster, p *provisioner.Provisioner, plat
}
}
// SetFlyProvisioner wires the Fly Machines provisioner. When set,
// workspace containers are provisioned as Fly Machines instead of
// local Docker containers.
func (h *WorkspaceHandler) SetFlyProvisioner(fp *provisioner.FlyProvisioner) {
h.flyProv = fp
}
// SetEnvMutators wires a provisionhook.Registry into the handler. Plugins
// living in separate repos register on the same Registry instance during
// boot (see cmd/server/main.go) and main.go calls this setter once before
@ -193,8 +201,10 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
configFiles = h.ensureDefaultConfig(id, payload)
}
// Auto-provision — start a container
if h.provisioner != nil {
// Auto-provision — start a container (Docker) or Fly Machine
if h.flyProv != nil {
go h.provisionWorkspaceFly(id, templatePath, configFiles, payload)
} else if h.provisioner != nil {
go h.provisionWorkspace(id, templatePath, configFiles, payload)
} else {
// No Docker available (SaaS tenant). Persist basic config as JSON

View File

@ -456,3 +456,92 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
log.Printf("Provisioner: generated %d config files for workspace %s (runtime: %s)", len(files), workspaceID, runtime)
return files
}
// provisionWorkspaceFly provisions a workspace as a Fly Machine instead
// of a local Docker container. Same secret-loading + env-var logic as
// provisionWorkspaceOpts, but calls FlyProvisioner.Start instead of
// the Docker provisioner.
func (h *WorkspaceHandler) provisionWorkspaceFly(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
// Load secrets (same as Docker path)
envVars := map[string]string{}
globalRows, globalErr := db.DB.QueryContext(ctx,
`SELECT key, encrypted_value, encryption_version FROM global_secrets`)
if globalErr == nil {
defer globalRows.Close()
for globalRows.Next() {
var k string
var v []byte
var ver int
if globalRows.Scan(&k, &v, &ver) == nil {
decrypted, decErr := crypto.DecryptVersioned(v, ver)
if decErr != nil {
log.Printf("FlyProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k))
return
}
envVars[k] = string(decrypted)
}
}
}
wsRows, err := db.DB.QueryContext(ctx,
`SELECT key, encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1`, workspaceID)
if err == nil {
defer wsRows.Close()
for wsRows.Next() {
var k string
var v []byte
var ver int
if wsRows.Scan(&k, &v, &ver) == nil {
decrypted, _ := crypto.DecryptVersioned(v, ver)
envVars[k] = string(decrypted)
}
}
}
applyAgentGitIdentity(envVars, payload.Name)
if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil {
log.Printf("FlyProvisioner: env mutator failed for %s: %v", workspaceID, err)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, err.Error())
return
}
awarenessNamespace := h.loadAwarenessNamespace(ctx, workspaceID)
cfg := provisioner.WorkspaceConfig{
WorkspaceID: workspaceID,
TemplatePath: templatePath,
ConfigFiles: configFiles,
Tier: payload.Tier,
Runtime: payload.Runtime,
EnvVars: envVars,
PlatformURL: h.platformURL,
AwarenessNamespace: awarenessNamespace,
}
machineID, err := h.flyProv.Start(ctx, cfg)
if err != nil {
log.Printf("FlyProvisioner: failed to start workspace %s: %v", workspaceID, err)
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, map[string]interface{}{
"error": err.Error(),
})
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, err.Error())
return
}
log.Printf("FlyProvisioner: workspace %s started as Fly machine %s", workspaceID, machineID)
// The workspace will register via POST /registry/register once the
// agent boots inside the Fly machine, which transitions it to 'online'.
// We issue a token preemptively so the agent can authenticate.
token, tokenErr := wsauth.IssueToken(ctx, db.DB, workspaceID)
if tokenErr != nil {
log.Printf("FlyProvisioner: failed to issue token for %s: %v", workspaceID, tokenErr)
} else {
log.Printf("FlyProvisioner: issued auth token for workspace %s (prefix: %s...)", workspaceID, token[:8])
}
}

View File

@ -0,0 +1,292 @@
package provisioner
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
)
// FlyRuntimeImages maps runtime names to their GHCR image tags for Fly.
// These are the same images as RuntimeImages but use the full registry path
// since Fly machines pull from a registry, not a local Docker daemon.
var FlyRuntimeImages = map[string]string{
"langgraph": "ghcr.io/molecule-ai/workspace-langgraph:latest",
"claude-code": "ghcr.io/molecule-ai/workspace-claude-code:latest",
"openclaw": "ghcr.io/molecule-ai/workspace-openclaw:latest",
"deepagents": "ghcr.io/molecule-ai/workspace-deepagents:latest",
"crewai": "ghcr.io/molecule-ai/workspace-crewai:latest",
"autogen": "ghcr.io/molecule-ai/workspace-autogen:latest",
"hermes": "ghcr.io/molecule-ai/workspace-hermes:latest",
"gemini-cli": "ghcr.io/molecule-ai/workspace-gemini-cli:latest",
}
const (
flyAPIBase = "https://api.machines.dev/v1"
flyDefaultSize = "shared-cpu-1x"
)
// FlyProvisioner provisions workspace agents as Fly Machines instead of
// local Docker containers. Used on SaaS tenants where no Docker daemon
// is available. Set CONTAINER_BACKEND=flyio to activate.
type FlyProvisioner struct {
token string // FLY_API_TOKEN
appID string // Fly app to create machines in (FLY_APP)
region string // Fly region (FLY_REGION, default "ord")
}
// NewFlyProvisioner creates a provisioner that manages workspaces as Fly Machines.
func NewFlyProvisioner() (*FlyProvisioner, error) {
token := os.Getenv("FLY_API_TOKEN")
if token == "" {
return nil, fmt.Errorf("FLY_API_TOKEN required for Fly provisioner")
}
appID := os.Getenv("FLY_WORKSPACE_APP")
if appID == "" {
return nil, fmt.Errorf("FLY_WORKSPACE_APP required (Fly app for workspace machines)")
}
region := os.Getenv("FLY_REGION")
if region == "" {
region = "ord"
}
return &FlyProvisioner{token: token, appID: appID, region: region}, nil
}
// flyMachineRequest is the payload for POST /apps/:app/machines.
type flyMachineRequest struct {
Name string `json:"name"`
Region string `json:"region"`
Config flyMachineConfig `json:"config"`
}
type flyMachineConfig struct {
Image string `json:"image"`
Env map[string]string `json:"env"`
Services []flyService `json:"services,omitempty"`
Guest *flyGuest `json:"guest,omitempty"`
}
type flyService struct {
Ports []flyPort `json:"ports"`
Protocol string `json:"protocol"`
InternalPort int `json:"internal_port"`
}
type flyPort struct {
Port int `json:"port"`
Handlers []string `json:"handlers"`
}
type flyGuest struct {
CPUKind string `json:"cpu_kind"`
CPUs int `json:"cpus"`
MemoryMB int `json:"memory_mb"`
}
type flyMachineResponse struct {
ID string `json:"id"`
Name string `json:"name"`
State string `json:"state"`
InstanceID string `json:"instance_id"`
PrivateIP string `json:"private_ip"`
}
// Start creates and starts a Fly Machine for the workspace.
func (p *FlyProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) {
image := FlyRuntimeImages[cfg.Runtime]
if image == "" {
image = FlyRuntimeImages["langgraph"]
}
name := ContainerName(cfg.WorkspaceID)
env := map[string]string{
"WORKSPACE_ID": cfg.WorkspaceID,
"PLATFORM_URL": cfg.PlatformURL,
"PORT": DefaultPort,
}
if cfg.AwarenessURL != "" {
env["AWARENESS_URL"] = cfg.AwarenessURL
}
if cfg.AwarenessNamespace != "" {
env["AWARENESS_NAMESPACE"] = cfg.AwarenessNamespace
}
// Merge additional env vars (API keys, secrets)
for k, v := range cfg.EnvVars {
env[k] = v
}
memMB := 512
cpus := 1
switch cfg.Tier {
case 3:
memMB = 2048
cpus = 2
case 4:
memMB = 4096
cpus = 4
}
req := flyMachineRequest{
Name: name,
Region: p.region,
Config: flyMachineConfig{
Image: image,
Env: env,
Services: []flyService{
{
InternalPort: 8000,
Protocol: "tcp",
Ports: []flyPort{
{Port: 443, Handlers: []string{"tls", "http"}},
},
},
},
Guest: &flyGuest{
CPUKind: "shared",
CPUs: cpus,
MemoryMB: memMB,
},
},
}
body, err := json.Marshal(req)
if err != nil {
return "", fmt.Errorf("fly: marshal request: %w", err)
}
url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID)
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("fly: create request: %w", err)
}
httpReq.Header.Set("Authorization", "Bearer "+p.token)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return "", fmt.Errorf("fly: send request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("fly: create machine failed (%d): %s", resp.StatusCode, string(respBody))
}
var machine flyMachineResponse
if err := json.Unmarshal(respBody, &machine); err != nil {
return "", fmt.Errorf("fly: parse response: %w", err)
}
log.Printf("Fly provisioner: created machine %s (%s) for workspace %s in %s",
machine.ID, machine.Name, cfg.WorkspaceID, p.region)
return machine.ID, nil
}
// Stop destroys the Fly Machine for a workspace.
func (p *FlyProvisioner) Stop(ctx context.Context, workspaceID string) error {
machineID, err := p.findMachine(ctx, workspaceID)
if err != nil {
return err
}
if machineID == "" {
return nil // already gone
}
url := fmt.Sprintf("%s/apps/%s/machines/%s?force=true", flyAPIBase, p.appID, machineID)
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fly: delete machine: %w", err)
}
resp.Body.Close()
log.Printf("Fly provisioner: deleted machine %s for workspace %s", machineID, workspaceID)
return nil
}
// IsRunning checks if the workspace's Fly Machine is in "started" state.
func (p *FlyProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
machineID, err := p.findMachine(ctx, workspaceID)
if err != nil {
return false, err
}
if machineID == "" {
return false, nil
}
url := fmt.Sprintf("%s/apps/%s/machines/%s", flyAPIBase, p.appID, machineID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
var machine flyMachineResponse
json.NewDecoder(resp.Body).Decode(&machine)
return machine.State == "started", nil
}
// Restart stops and re-creates the machine.
func (p *FlyProvisioner) Restart(ctx context.Context, workspaceID string, cfg WorkspaceConfig) error {
machineID, err := p.findMachine(ctx, workspaceID)
if err != nil {
return err
}
if machineID != "" {
// Restart existing machine
url := fmt.Sprintf("%s/apps/%s/machines/%s/restart", flyAPIBase, p.appID, machineID)
req, _ := http.NewRequestWithContext(ctx, "POST", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fly: restart machine: %w", err)
}
resp.Body.Close()
log.Printf("Fly provisioner: restarted machine %s for workspace %s", machineID, workspaceID)
return nil
}
// Machine doesn't exist — create it
_, err = p.Start(ctx, cfg)
return err
}
// findMachine looks up the Fly Machine for a workspace by name.
func (p *FlyProvisioner) findMachine(ctx context.Context, workspaceID string) (string, error) {
name := ContainerName(workspaceID)
url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("fly: list machines: %w", err)
}
defer resp.Body.Close()
var machines []flyMachineResponse
json.NewDecoder(resp.Body).Decode(&machines)
for _, m := range machines {
if m.Name == name {
return m.ID, nil
}
}
return "", nil
}
// Close is a no-op for the Fly provisioner (no persistent connections).
func (p *FlyProvisioner) Close() error { return nil }