fix: code review findings — dead code, DRY, rate limit, docs

1. Delete fly_provisioner.go — superseded by control plane architecture.
   Direct Fly provisioning from tenant was intentionally removed.

2. Extract loadWorkspaceSecrets() — shared by Docker + CP provisioner
   paths. Eliminates 30-line secret-loading duplication.

3. Token rate limit — max 50 active tokens per workspace. Returns 429
   if exceeded. Prevents unbounded token creation by compromised client.

4. CLAUDE.md — add GET/POST/DELETE /workspaces/:id/tokens to route table.

5. .env.example — document MOLECULE_ORG_ID and CP_PROVISION_URL.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-16 12:04:37 -07:00
parent 77d42268d4
commit 8f4d0997c8
5 changed files with 43 additions and 311 deletions

View File

@ -17,6 +17,8 @@ PLUGINS_DIR= # Path to plugins/ directory (default: /plugins i
# WORKSPACE_DIR= # Optional global host path bind-mounted to /workspace in every container. Per-workspace workspace_dir column overrides this; if neither is set each workspace gets an isolated Docker named volume.
# MOLECULE_ENV=development # Environment label (development/staging/production). Used for log tagging and conditional behaviour.
# MOLECULE_ENABLE_TEST_TOKENS= # Set to 1 to expose GET /admin/workspaces/:id/test-token (mints a fresh bearer token for E2E scripts). The route is auto-enabled when MOLECULE_ENV != production; this flag is the explicit override. Leave unset/0 in prod — the route 404s unless enabled.
# MOLECULE_ORG_ID= # SaaS only: org UUID set by control plane on tenant machines. When set, workspace provisioning auto-routes through the control plane API instead of Docker.
# CP_PROVISION_URL= # Override control plane URL for workspace provisioning (default: https://api.moleculesai.app). Only needed for testing against a non-production control plane.
# CORS / rate limiting
# CORS_ORIGINS=http://localhost:3000,http://localhost:3001 # Comma-separated allowed origins for the HTTP API.

View File

@ -438,6 +438,8 @@ Three Gin middleware classes gate server-side routes — pick the right one. Ful
| GET/POST/DELETE | /workspaces/:id/plugins[/:name] | plugins.go — list, install (`{"source":"scheme://spec"}`), uninstall per-workspace |
| GET | /workspaces/:id/plugins/available | plugins.go (filtered by workspace runtime) |
| GET | /workspaces/:id/plugins/compatibility?runtime=X | plugins.go (preflight runtime-change check) |
| GET/POST | /workspaces/:id/tokens | tokens.go — list active tokens (prefix + metadata), create new token (plaintext returned once). Max 50 per workspace. |
| DELETE | /workspaces/:id/tokens/:tokenId | tokens.go — revoke specific token by ID |
| GET | /bundles/export/:id | bundle.go — `AdminAuth` (#165 / PR #167) |
| POST | /bundles/import | bundle.go — `AdminAuth` (#164 CRITICAL / PR #167) |
| GET | /org/templates | org.go (list available org templates) |

View File

@ -1,6 +1,7 @@
package handlers
import (
"fmt"
"log"
"net/http"
"strconv"
@ -73,11 +74,25 @@ func (h *TokenHandler) List(c *gin.Context) {
})
}
// maxTokensPerWorkspace prevents unbounded token creation. 50 is generous —
// most workspaces need 1-3 tokens (primary + rotation spare).
const maxTokensPerWorkspace = 50
// Create mints a new token for the workspace. The plaintext is returned
// exactly once in the response — it cannot be recovered afterwards.
func (h *TokenHandler) Create(c *gin.Context) {
workspaceID := c.Param("id")
// Rate limit: max active tokens per workspace
var count int
db.DB.QueryRowContext(c.Request.Context(),
`SELECT COUNT(*) FROM workspace_auth_tokens WHERE workspace_id = $1 AND revoked_at IS NULL`,
workspaceID).Scan(&count)
if count >= maxTokensPerWorkspace {
c.JSON(http.StatusTooManyRequests, gin.H{"error": fmt.Sprintf("maximum %d active tokens per workspace", maxTokensPerWorkspace)})
return
}
token, err := wsauth.IssueToken(c.Request.Context(), db.DB, workspaceID)
if err != nil {
log.Printf("tokens: issue failed for %s: %v", workspaceID, err)

View File

@ -457,20 +457,10 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
return files
}
// provisionWorkspaceFly provisions a workspace as a Fly Machine instead
// of a local Docker container. Same secret-loading + env-var logic as
// provisionWorkspaceFly is removed — use provisionWorkspaceCP instead.
// Direct Fly provisioning from the tenant was replaced by the control
// plane architecture (the CP holds the Fly token, manages billing/quotas).
// provisionWorkspaceCP provisions a workspace via the control plane API.
// The control plane holds the Fly token and manages billing/quotas — the
// tenant platform never talks to Fly directly.
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
// Load secrets (same as Docker/Fly paths)
// loadWorkspaceSecrets loads global + workspace-specific secrets into a map.
// Returns nil map + error string on decrypt failure. Shared by both Docker
// and control plane provisioning paths to avoid duplication.
func loadWorkspaceSecrets(ctx context.Context, workspaceID string) (map[string]string, string) {
envVars := map[string]string{}
globalRows, globalErr := db.DB.QueryContext(ctx,
`SELECT key, encrypted_value, encryption_version FROM global_secrets`)
@ -483,10 +473,7 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
if globalRows.Scan(&k, &v, &ver) == nil {
decrypted, decErr := crypto.DecryptVersioned(v, ver)
if decErr != nil {
log.Printf("CPProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k))
return
return nil, fmt.Sprintf("cannot decrypt global secret %s: %v", k, decErr)
}
envVars[k] = string(decrypted)
}
@ -501,11 +488,29 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
var v []byte
var ver int
if wsRows.Scan(&k, &v, &ver) == nil {
decrypted, _ := crypto.DecryptVersioned(v, ver)
decrypted, decErr := crypto.DecryptVersioned(v, ver)
if decErr != nil {
return nil, fmt.Sprintf("cannot decrypt workspace secret %s: %v", k, decErr)
}
envVars[k] = string(decrypted)
}
}
}
return envVars, ""
}
// provisionWorkspaceCP provisions a workspace via the control plane API.
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
envVars, decryptErr := loadWorkspaceSecrets(ctx, workspaceID)
if decryptErr != "" {
log.Printf("CPProvisioner: %s for %s", decryptErr, workspaceID)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, decryptErr)
return
}
applyAgentGitIdentity(envVars, payload.Name)
if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil {

View File

@ -1,292 +0,0 @@
package provisioner
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
)
// FlyRuntimeImages maps runtime names to their GHCR image tags for Fly.
// These are the same images as RuntimeImages but use the full registry path
// since Fly machines pull from a registry, not a local Docker daemon.
var FlyRuntimeImages = map[string]string{
"langgraph": "ghcr.io/molecule-ai/workspace-langgraph:latest",
"claude-code": "ghcr.io/molecule-ai/workspace-claude-code:latest",
"openclaw": "ghcr.io/molecule-ai/workspace-openclaw:latest",
"deepagents": "ghcr.io/molecule-ai/workspace-deepagents:latest",
"crewai": "ghcr.io/molecule-ai/workspace-crewai:latest",
"autogen": "ghcr.io/molecule-ai/workspace-autogen:latest",
"hermes": "ghcr.io/molecule-ai/workspace-hermes:latest",
"gemini-cli": "ghcr.io/molecule-ai/workspace-gemini-cli:latest",
}
const (
flyAPIBase = "https://api.machines.dev/v1"
flyDefaultSize = "shared-cpu-1x"
)
// FlyProvisioner provisions workspace agents as Fly Machines instead of
// local Docker containers. Used on SaaS tenants where no Docker daemon
// is available. Set CONTAINER_BACKEND=flyio to activate.
type FlyProvisioner struct {
token string // FLY_API_TOKEN
appID string // Fly app to create machines in (FLY_APP)
region string // Fly region (FLY_REGION, default "ord")
}
// NewFlyProvisioner creates a provisioner that manages workspaces as Fly Machines.
func NewFlyProvisioner() (*FlyProvisioner, error) {
token := os.Getenv("FLY_API_TOKEN")
if token == "" {
return nil, fmt.Errorf("FLY_API_TOKEN required for Fly provisioner")
}
appID := os.Getenv("FLY_WORKSPACE_APP")
if appID == "" {
return nil, fmt.Errorf("FLY_WORKSPACE_APP required (Fly app for workspace machines)")
}
region := os.Getenv("FLY_REGION")
if region == "" {
region = "ord"
}
return &FlyProvisioner{token: token, appID: appID, region: region}, nil
}
// flyMachineRequest is the payload for POST /apps/:app/machines.
type flyMachineRequest struct {
Name string `json:"name"`
Region string `json:"region"`
Config flyMachineConfig `json:"config"`
}
type flyMachineConfig struct {
Image string `json:"image"`
Env map[string]string `json:"env"`
Services []flyService `json:"services,omitempty"`
Guest *flyGuest `json:"guest,omitempty"`
}
type flyService struct {
Ports []flyPort `json:"ports"`
Protocol string `json:"protocol"`
InternalPort int `json:"internal_port"`
}
type flyPort struct {
Port int `json:"port"`
Handlers []string `json:"handlers"`
}
type flyGuest struct {
CPUKind string `json:"cpu_kind"`
CPUs int `json:"cpus"`
MemoryMB int `json:"memory_mb"`
}
type flyMachineResponse struct {
ID string `json:"id"`
Name string `json:"name"`
State string `json:"state"`
InstanceID string `json:"instance_id"`
PrivateIP string `json:"private_ip"`
}
// Start creates and starts a Fly Machine for the workspace.
func (p *FlyProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) {
image := FlyRuntimeImages[cfg.Runtime]
if image == "" {
image = FlyRuntimeImages["langgraph"]
}
name := ContainerName(cfg.WorkspaceID)
env := map[string]string{
"WORKSPACE_ID": cfg.WorkspaceID,
"PLATFORM_URL": cfg.PlatformURL,
"PORT": DefaultPort,
}
if cfg.AwarenessURL != "" {
env["AWARENESS_URL"] = cfg.AwarenessURL
}
if cfg.AwarenessNamespace != "" {
env["AWARENESS_NAMESPACE"] = cfg.AwarenessNamespace
}
// Merge additional env vars (API keys, secrets)
for k, v := range cfg.EnvVars {
env[k] = v
}
memMB := 512
cpus := 1
switch cfg.Tier {
case 3:
memMB = 2048
cpus = 2
case 4:
memMB = 4096
cpus = 4
}
req := flyMachineRequest{
Name: name,
Region: p.region,
Config: flyMachineConfig{
Image: image,
Env: env,
Services: []flyService{
{
InternalPort: 8000,
Protocol: "tcp",
Ports: []flyPort{
{Port: 443, Handlers: []string{"tls", "http"}},
},
},
},
Guest: &flyGuest{
CPUKind: "shared",
CPUs: cpus,
MemoryMB: memMB,
},
},
}
body, err := json.Marshal(req)
if err != nil {
return "", fmt.Errorf("fly: marshal request: %w", err)
}
url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID)
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("fly: create request: %w", err)
}
httpReq.Header.Set("Authorization", "Bearer "+p.token)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return "", fmt.Errorf("fly: send request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("fly: create machine failed (%d): %s", resp.StatusCode, string(respBody))
}
var machine flyMachineResponse
if err := json.Unmarshal(respBody, &machine); err != nil {
return "", fmt.Errorf("fly: parse response: %w", err)
}
log.Printf("Fly provisioner: created machine %s (%s) for workspace %s in %s",
machine.ID, machine.Name, cfg.WorkspaceID, p.region)
return machine.ID, nil
}
// Stop destroys the Fly Machine for a workspace.
func (p *FlyProvisioner) Stop(ctx context.Context, workspaceID string) error {
machineID, err := p.findMachine(ctx, workspaceID)
if err != nil {
return err
}
if machineID == "" {
return nil // already gone
}
url := fmt.Sprintf("%s/apps/%s/machines/%s?force=true", flyAPIBase, p.appID, machineID)
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fly: delete machine: %w", err)
}
resp.Body.Close()
log.Printf("Fly provisioner: deleted machine %s for workspace %s", machineID, workspaceID)
return nil
}
// IsRunning checks if the workspace's Fly Machine is in "started" state.
func (p *FlyProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
machineID, err := p.findMachine(ctx, workspaceID)
if err != nil {
return false, err
}
if machineID == "" {
return false, nil
}
url := fmt.Sprintf("%s/apps/%s/machines/%s", flyAPIBase, p.appID, machineID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
var machine flyMachineResponse
json.NewDecoder(resp.Body).Decode(&machine)
return machine.State == "started", nil
}
// Restart stops and re-creates the machine.
func (p *FlyProvisioner) Restart(ctx context.Context, workspaceID string, cfg WorkspaceConfig) error {
machineID, err := p.findMachine(ctx, workspaceID)
if err != nil {
return err
}
if machineID != "" {
// Restart existing machine
url := fmt.Sprintf("%s/apps/%s/machines/%s/restart", flyAPIBase, p.appID, machineID)
req, _ := http.NewRequestWithContext(ctx, "POST", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fly: restart machine: %w", err)
}
resp.Body.Close()
log.Printf("Fly provisioner: restarted machine %s for workspace %s", machineID, workspaceID)
return nil
}
// Machine doesn't exist — create it
_, err = p.Start(ctx, cfg)
return err
}
// findMachine looks up the Fly Machine for a workspace by name.
func (p *FlyProvisioner) findMachine(ctx context.Context, workspaceID string) (string, error) {
name := ContainerName(workspaceID)
url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "Bearer "+p.token)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("fly: list machines: %w", err)
}
defer resp.Body.Close()
var machines []flyMachineResponse
json.NewDecoder(resp.Body).Decode(&machines)
for _, m := range machines {
if m.Name == name {
return m.ID, nil
}
}
return "", nil
}
// Close is a no-op for the Fly provisioner (no persistent connections).
func (p *FlyProvisioner) Close() error { return nil }