From b6e039cb49a71f486de74c5c45a9eeb3d73f8970 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 16 Apr 2026 12:04:37 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20code=20review=20findings=20=E2=80=94=20d?= =?UTF-8?q?ead=20code,=20DRY,=20rate=20limit,=20docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Delete fly_provisioner.go — superseded by control plane architecture. Direct Fly provisioning from tenant was intentionally removed. 2. Extract loadWorkspaceSecrets() — shared by Docker + CP provisioner paths. Eliminates 30-line secret-loading duplication. 3. Token rate limit — max 50 active tokens per workspace. Returns 429 if exceeded. Prevents unbounded token creation by compromised client. 4. CLAUDE.md — add GET/POST/DELETE /workspaces/:id/tokens to route table. 5. .env.example — document MOLECULE_ORG_ID and CP_PROVISION_URL. Co-Authored-By: Claude Opus 4.6 (1M context) --- .env.example | 2 + CLAUDE.md | 2 + platform/internal/handlers/tokens.go | 15 + .../internal/handlers/workspace_provision.go | 43 +-- .../internal/provisioner/fly_provisioner.go | 292 ------------------ 5 files changed, 43 insertions(+), 311 deletions(-) delete mode 100644 platform/internal/provisioner/fly_provisioner.go diff --git a/.env.example b/.env.example index 9e4c6599..3a8b39c9 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,8 @@ PLUGINS_DIR= # Path to plugins/ directory (default: /plugins i # WORKSPACE_DIR= # Optional global host path bind-mounted to /workspace in every container. Per-workspace workspace_dir column overrides this; if neither is set each workspace gets an isolated Docker named volume. # MOLECULE_ENV=development # Environment label (development/staging/production). Used for log tagging and conditional behaviour. # MOLECULE_ENABLE_TEST_TOKENS= # Set to 1 to expose GET /admin/workspaces/:id/test-token (mints a fresh bearer token for E2E scripts). The route is auto-enabled when MOLECULE_ENV != production; this flag is the explicit override. Leave unset/0 in prod — the route 404s unless enabled. +# MOLECULE_ORG_ID= # SaaS only: org UUID set by control plane on tenant machines. When set, workspace provisioning auto-routes through the control plane API instead of Docker. +# CP_PROVISION_URL= # Override control plane URL for workspace provisioning (default: https://api.moleculesai.app). Only needed for testing against a non-production control plane. # CORS / rate limiting # CORS_ORIGINS=http://localhost:3000,http://localhost:3001 # Comma-separated allowed origins for the HTTP API. diff --git a/CLAUDE.md b/CLAUDE.md index c5333d67..27c377a0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -438,6 +438,8 @@ Three Gin middleware classes gate server-side routes — pick the right one. Ful | GET/POST/DELETE | /workspaces/:id/plugins[/:name] | plugins.go — list, install (`{"source":"scheme://spec"}`), uninstall per-workspace | | GET | /workspaces/:id/plugins/available | plugins.go (filtered by workspace runtime) | | GET | /workspaces/:id/plugins/compatibility?runtime=X | plugins.go (preflight runtime-change check) | +| GET/POST | /workspaces/:id/tokens | tokens.go — list active tokens (prefix + metadata), create new token (plaintext returned once). Max 50 per workspace. | +| DELETE | /workspaces/:id/tokens/:tokenId | tokens.go — revoke specific token by ID | | GET | /bundles/export/:id | bundle.go — `AdminAuth` (#165 / PR #167) | | POST | /bundles/import | bundle.go — `AdminAuth` (#164 CRITICAL / PR #167) | | GET | /org/templates | org.go (list available org templates) | diff --git a/platform/internal/handlers/tokens.go b/platform/internal/handlers/tokens.go index 765c36a1..e63eff29 100644 --- a/platform/internal/handlers/tokens.go +++ b/platform/internal/handlers/tokens.go @@ -1,6 +1,7 @@ package handlers import ( + "fmt" "log" "net/http" "strconv" @@ -73,11 +74,25 @@ func (h *TokenHandler) List(c *gin.Context) { }) } +// maxTokensPerWorkspace prevents unbounded token creation. 50 is generous — +// most workspaces need 1-3 tokens (primary + rotation spare). +const maxTokensPerWorkspace = 50 + // Create mints a new token for the workspace. The plaintext is returned // exactly once in the response — it cannot be recovered afterwards. func (h *TokenHandler) Create(c *gin.Context) { workspaceID := c.Param("id") + // Rate limit: max active tokens per workspace + var count int + db.DB.QueryRowContext(c.Request.Context(), + `SELECT COUNT(*) FROM workspace_auth_tokens WHERE workspace_id = $1 AND revoked_at IS NULL`, + workspaceID).Scan(&count) + if count >= maxTokensPerWorkspace { + c.JSON(http.StatusTooManyRequests, gin.H{"error": fmt.Sprintf("maximum %d active tokens per workspace", maxTokensPerWorkspace)}) + return + } + token, err := wsauth.IssueToken(c.Request.Context(), db.DB, workspaceID) if err != nil { log.Printf("tokens: issue failed for %s: %v", workspaceID, err) diff --git a/platform/internal/handlers/workspace_provision.go b/platform/internal/handlers/workspace_provision.go index 9f2ae8bc..b7cea43d 100644 --- a/platform/internal/handlers/workspace_provision.go +++ b/platform/internal/handlers/workspace_provision.go @@ -457,20 +457,10 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model return files } -// provisionWorkspaceFly provisions a workspace as a Fly Machine instead -// of a local Docker container. Same secret-loading + env-var logic as -// provisionWorkspaceFly is removed — use provisionWorkspaceCP instead. -// Direct Fly provisioning from the tenant was replaced by the control -// plane architecture (the CP holds the Fly token, manages billing/quotas). - -// provisionWorkspaceCP provisions a workspace via the control plane API. -// The control plane holds the Fly token and manages billing/quotas — the -// tenant platform never talks to Fly directly. -func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) { - ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout) - defer cancel() - - // Load secrets (same as Docker/Fly paths) +// loadWorkspaceSecrets loads global + workspace-specific secrets into a map. +// Returns nil map + error string on decrypt failure. Shared by both Docker +// and control plane provisioning paths to avoid duplication. +func loadWorkspaceSecrets(ctx context.Context, workspaceID string) (map[string]string, string) { envVars := map[string]string{} globalRows, globalErr := db.DB.QueryContext(ctx, `SELECT key, encrypted_value, encryption_version FROM global_secrets`) @@ -483,10 +473,7 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string if globalRows.Scan(&k, &v, &ver) == nil { decrypted, decErr := crypto.DecryptVersioned(v, ver) if decErr != nil { - log.Printf("CPProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr) - db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, - workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k)) - return + return nil, fmt.Sprintf("cannot decrypt global secret %s: %v", k, decErr) } envVars[k] = string(decrypted) } @@ -501,11 +488,29 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string var v []byte var ver int if wsRows.Scan(&k, &v, &ver) == nil { - decrypted, _ := crypto.DecryptVersioned(v, ver) + decrypted, decErr := crypto.DecryptVersioned(v, ver) + if decErr != nil { + return nil, fmt.Sprintf("cannot decrypt workspace secret %s: %v", k, decErr) + } envVars[k] = string(decrypted) } } } + return envVars, "" +} + +// provisionWorkspaceCP provisions a workspace via the control plane API. +func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) { + ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout) + defer cancel() + + envVars, decryptErr := loadWorkspaceSecrets(ctx, workspaceID) + if decryptErr != "" { + log.Printf("CPProvisioner: %s for %s", decryptErr, workspaceID) + db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`, + workspaceID, decryptErr) + return + } applyAgentGitIdentity(envVars, payload.Name) if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil { diff --git a/platform/internal/provisioner/fly_provisioner.go b/platform/internal/provisioner/fly_provisioner.go deleted file mode 100644 index e6868c04..00000000 --- a/platform/internal/provisioner/fly_provisioner.go +++ /dev/null @@ -1,292 +0,0 @@ -package provisioner - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "log" - "net/http" - "os" - "time" -) - -// FlyRuntimeImages maps runtime names to their GHCR image tags for Fly. -// These are the same images as RuntimeImages but use the full registry path -// since Fly machines pull from a registry, not a local Docker daemon. -var FlyRuntimeImages = map[string]string{ - "langgraph": "ghcr.io/molecule-ai/workspace-langgraph:latest", - "claude-code": "ghcr.io/molecule-ai/workspace-claude-code:latest", - "openclaw": "ghcr.io/molecule-ai/workspace-openclaw:latest", - "deepagents": "ghcr.io/molecule-ai/workspace-deepagents:latest", - "crewai": "ghcr.io/molecule-ai/workspace-crewai:latest", - "autogen": "ghcr.io/molecule-ai/workspace-autogen:latest", - "hermes": "ghcr.io/molecule-ai/workspace-hermes:latest", - "gemini-cli": "ghcr.io/molecule-ai/workspace-gemini-cli:latest", -} - -const ( - flyAPIBase = "https://api.machines.dev/v1" - flyDefaultSize = "shared-cpu-1x" -) - -// FlyProvisioner provisions workspace agents as Fly Machines instead of -// local Docker containers. Used on SaaS tenants where no Docker daemon -// is available. Set CONTAINER_BACKEND=flyio to activate. -type FlyProvisioner struct { - token string // FLY_API_TOKEN - appID string // Fly app to create machines in (FLY_APP) - region string // Fly region (FLY_REGION, default "ord") -} - -// NewFlyProvisioner creates a provisioner that manages workspaces as Fly Machines. -func NewFlyProvisioner() (*FlyProvisioner, error) { - token := os.Getenv("FLY_API_TOKEN") - if token == "" { - return nil, fmt.Errorf("FLY_API_TOKEN required for Fly provisioner") - } - appID := os.Getenv("FLY_WORKSPACE_APP") - if appID == "" { - return nil, fmt.Errorf("FLY_WORKSPACE_APP required (Fly app for workspace machines)") - } - region := os.Getenv("FLY_REGION") - if region == "" { - region = "ord" - } - return &FlyProvisioner{token: token, appID: appID, region: region}, nil -} - -// flyMachineRequest is the payload for POST /apps/:app/machines. -type flyMachineRequest struct { - Name string `json:"name"` - Region string `json:"region"` - Config flyMachineConfig `json:"config"` -} - -type flyMachineConfig struct { - Image string `json:"image"` - Env map[string]string `json:"env"` - Services []flyService `json:"services,omitempty"` - Guest *flyGuest `json:"guest,omitempty"` -} - -type flyService struct { - Ports []flyPort `json:"ports"` - Protocol string `json:"protocol"` - InternalPort int `json:"internal_port"` -} - -type flyPort struct { - Port int `json:"port"` - Handlers []string `json:"handlers"` -} - -type flyGuest struct { - CPUKind string `json:"cpu_kind"` - CPUs int `json:"cpus"` - MemoryMB int `json:"memory_mb"` -} - -type flyMachineResponse struct { - ID string `json:"id"` - Name string `json:"name"` - State string `json:"state"` - InstanceID string `json:"instance_id"` - PrivateIP string `json:"private_ip"` -} - -// Start creates and starts a Fly Machine for the workspace. -func (p *FlyProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) { - image := FlyRuntimeImages[cfg.Runtime] - if image == "" { - image = FlyRuntimeImages["langgraph"] - } - - name := ContainerName(cfg.WorkspaceID) - - env := map[string]string{ - "WORKSPACE_ID": cfg.WorkspaceID, - "PLATFORM_URL": cfg.PlatformURL, - "PORT": DefaultPort, - } - if cfg.AwarenessURL != "" { - env["AWARENESS_URL"] = cfg.AwarenessURL - } - if cfg.AwarenessNamespace != "" { - env["AWARENESS_NAMESPACE"] = cfg.AwarenessNamespace - } - // Merge additional env vars (API keys, secrets) - for k, v := range cfg.EnvVars { - env[k] = v - } - - memMB := 512 - cpus := 1 - switch cfg.Tier { - case 3: - memMB = 2048 - cpus = 2 - case 4: - memMB = 4096 - cpus = 4 - } - - req := flyMachineRequest{ - Name: name, - Region: p.region, - Config: flyMachineConfig{ - Image: image, - Env: env, - Services: []flyService{ - { - InternalPort: 8000, - Protocol: "tcp", - Ports: []flyPort{ - {Port: 443, Handlers: []string{"tls", "http"}}, - }, - }, - }, - Guest: &flyGuest{ - CPUKind: "shared", - CPUs: cpus, - MemoryMB: memMB, - }, - }, - } - - body, err := json.Marshal(req) - if err != nil { - return "", fmt.Errorf("fly: marshal request: %w", err) - } - - url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID) - httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) - if err != nil { - return "", fmt.Errorf("fly: create request: %w", err) - } - httpReq.Header.Set("Authorization", "Bearer "+p.token) - httpReq.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(httpReq) - if err != nil { - return "", fmt.Errorf("fly: send request: %w", err) - } - defer resp.Body.Close() - - respBody, _ := io.ReadAll(resp.Body) - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return "", fmt.Errorf("fly: create machine failed (%d): %s", resp.StatusCode, string(respBody)) - } - - var machine flyMachineResponse - if err := json.Unmarshal(respBody, &machine); err != nil { - return "", fmt.Errorf("fly: parse response: %w", err) - } - - log.Printf("Fly provisioner: created machine %s (%s) for workspace %s in %s", - machine.ID, machine.Name, cfg.WorkspaceID, p.region) - - return machine.ID, nil -} - -// Stop destroys the Fly Machine for a workspace. -func (p *FlyProvisioner) Stop(ctx context.Context, workspaceID string) error { - machineID, err := p.findMachine(ctx, workspaceID) - if err != nil { - return err - } - if machineID == "" { - return nil // already gone - } - - url := fmt.Sprintf("%s/apps/%s/machines/%s?force=true", flyAPIBase, p.appID, machineID) - req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil) - req.Header.Set("Authorization", "Bearer "+p.token) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("fly: delete machine: %w", err) - } - resp.Body.Close() - - log.Printf("Fly provisioner: deleted machine %s for workspace %s", machineID, workspaceID) - return nil -} - -// IsRunning checks if the workspace's Fly Machine is in "started" state. -func (p *FlyProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) { - machineID, err := p.findMachine(ctx, workspaceID) - if err != nil { - return false, err - } - if machineID == "" { - return false, nil - } - - url := fmt.Sprintf("%s/apps/%s/machines/%s", flyAPIBase, p.appID, machineID) - req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) - req.Header.Set("Authorization", "Bearer "+p.token) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return false, err - } - defer resp.Body.Close() - - var machine flyMachineResponse - json.NewDecoder(resp.Body).Decode(&machine) - return machine.State == "started", nil -} - -// Restart stops and re-creates the machine. -func (p *FlyProvisioner) Restart(ctx context.Context, workspaceID string, cfg WorkspaceConfig) error { - machineID, err := p.findMachine(ctx, workspaceID) - if err != nil { - return err - } - if machineID != "" { - // Restart existing machine - url := fmt.Sprintf("%s/apps/%s/machines/%s/restart", flyAPIBase, p.appID, machineID) - req, _ := http.NewRequestWithContext(ctx, "POST", url, nil) - req.Header.Set("Authorization", "Bearer "+p.token) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("fly: restart machine: %w", err) - } - resp.Body.Close() - log.Printf("Fly provisioner: restarted machine %s for workspace %s", machineID, workspaceID) - return nil - } - // Machine doesn't exist — create it - _, err = p.Start(ctx, cfg) - return err -} - -// findMachine looks up the Fly Machine for a workspace by name. -func (p *FlyProvisioner) findMachine(ctx context.Context, workspaceID string) (string, error) { - name := ContainerName(workspaceID) - url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID) - req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) - req.Header.Set("Authorization", "Bearer "+p.token) - - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - return "", fmt.Errorf("fly: list machines: %w", err) - } - defer resp.Body.Close() - - var machines []flyMachineResponse - json.NewDecoder(resp.Body).Decode(&machines) - - for _, m := range machines { - if m.Name == name { - return m.ID, nil - } - } - return "", nil -} - -// Close is a no-op for the Fly provisioner (no persistent connections). -func (p *FlyProvisioner) Close() error { return nil }