forked from molecule-ai/molecule-core
Merge pull request #504 from Molecule-AI/fix/code-review-final-batch
fix: code review — dead code, DRY, rate limit, docs
This commit is contained in:
commit
a18e0182d5
@ -17,6 +17,8 @@ PLUGINS_DIR= # Path to plugins/ directory (default: /plugins i
|
||||
# WORKSPACE_DIR= # Optional global host path bind-mounted to /workspace in every container. Per-workspace workspace_dir column overrides this; if neither is set each workspace gets an isolated Docker named volume.
|
||||
# MOLECULE_ENV=development # Environment label (development/staging/production). Used for log tagging and conditional behaviour.
|
||||
# MOLECULE_ENABLE_TEST_TOKENS= # Set to 1 to expose GET /admin/workspaces/:id/test-token (mints a fresh bearer token for E2E scripts). The route is auto-enabled when MOLECULE_ENV != production; this flag is the explicit override. Leave unset/0 in prod — the route 404s unless enabled.
|
||||
# MOLECULE_ORG_ID= # SaaS only: org UUID set by control plane on tenant machines. When set, workspace provisioning auto-routes through the control plane API instead of Docker.
|
||||
# CP_PROVISION_URL= # Override control plane URL for workspace provisioning (default: https://api.moleculesai.app). Only needed for testing against a non-production control plane.
|
||||
|
||||
# CORS / rate limiting
|
||||
# CORS_ORIGINS=http://localhost:3000,http://localhost:3001 # Comma-separated allowed origins for the HTTP API.
|
||||
|
||||
@ -438,6 +438,8 @@ Three Gin middleware classes gate server-side routes — pick the right one. Ful
|
||||
| GET/POST/DELETE | /workspaces/:id/plugins[/:name] | plugins.go — list, install (`{"source":"scheme://spec"}`), uninstall per-workspace |
|
||||
| GET | /workspaces/:id/plugins/available | plugins.go (filtered by workspace runtime) |
|
||||
| GET | /workspaces/:id/plugins/compatibility?runtime=X | plugins.go (preflight runtime-change check) |
|
||||
| GET/POST | /workspaces/:id/tokens | tokens.go — list active tokens (prefix + metadata), create new token (plaintext returned once). Max 50 per workspace. |
|
||||
| DELETE | /workspaces/:id/tokens/:tokenId | tokens.go — revoke specific token by ID |
|
||||
| GET | /bundles/export/:id | bundle.go — `AdminAuth` (#165 / PR #167) |
|
||||
| POST | /bundles/import | bundle.go — `AdminAuth` (#164 CRITICAL / PR #167) |
|
||||
| GET | /org/templates | org.go (list available org templates) |
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@ -73,11 +74,25 @@ func (h *TokenHandler) List(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// maxTokensPerWorkspace prevents unbounded token creation. 50 is generous —
|
||||
// most workspaces need 1-3 tokens (primary + rotation spare).
|
||||
const maxTokensPerWorkspace = 50
|
||||
|
||||
// Create mints a new token for the workspace. The plaintext is returned
|
||||
// exactly once in the response — it cannot be recovered afterwards.
|
||||
func (h *TokenHandler) Create(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
// Rate limit: max active tokens per workspace
|
||||
var count int
|
||||
db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT COUNT(*) FROM workspace_auth_tokens WHERE workspace_id = $1 AND revoked_at IS NULL`,
|
||||
workspaceID).Scan(&count)
|
||||
if count >= maxTokensPerWorkspace {
|
||||
c.JSON(http.StatusTooManyRequests, gin.H{"error": fmt.Sprintf("maximum %d active tokens per workspace", maxTokensPerWorkspace)})
|
||||
return
|
||||
}
|
||||
|
||||
token, err := wsauth.IssueToken(c.Request.Context(), db.DB, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("tokens: issue failed for %s: %v", workspaceID, err)
|
||||
|
||||
@ -457,20 +457,10 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
|
||||
return files
|
||||
}
|
||||
|
||||
// provisionWorkspaceFly provisions a workspace as a Fly Machine instead
|
||||
// of a local Docker container. Same secret-loading + env-var logic as
|
||||
// provisionWorkspaceFly is removed — use provisionWorkspaceCP instead.
|
||||
// Direct Fly provisioning from the tenant was replaced by the control
|
||||
// plane architecture (the CP holds the Fly token, manages billing/quotas).
|
||||
|
||||
// provisionWorkspaceCP provisions a workspace via the control plane API.
|
||||
// The control plane holds the Fly token and manages billing/quotas — the
|
||||
// tenant platform never talks to Fly directly.
|
||||
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Load secrets (same as Docker/Fly paths)
|
||||
// loadWorkspaceSecrets loads global + workspace-specific secrets into a map.
|
||||
// Returns nil map + error string on decrypt failure. Shared by both Docker
|
||||
// and control plane provisioning paths to avoid duplication.
|
||||
func loadWorkspaceSecrets(ctx context.Context, workspaceID string) (map[string]string, string) {
|
||||
envVars := map[string]string{}
|
||||
globalRows, globalErr := db.DB.QueryContext(ctx,
|
||||
`SELECT key, encrypted_value, encryption_version FROM global_secrets`)
|
||||
@ -483,10 +473,7 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
|
||||
if globalRows.Scan(&k, &v, &ver) == nil {
|
||||
decrypted, decErr := crypto.DecryptVersioned(v, ver)
|
||||
if decErr != nil {
|
||||
log.Printf("CPProvisioner: failed to decrypt global secret %s for %s: %v", k, workspaceID, decErr)
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
|
||||
workspaceID, fmt.Sprintf("cannot decrypt global secret %s", k))
|
||||
return
|
||||
return nil, fmt.Sprintf("cannot decrypt global secret %s: %v", k, decErr)
|
||||
}
|
||||
envVars[k] = string(decrypted)
|
||||
}
|
||||
@ -501,11 +488,29 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
|
||||
var v []byte
|
||||
var ver int
|
||||
if wsRows.Scan(&k, &v, &ver) == nil {
|
||||
decrypted, _ := crypto.DecryptVersioned(v, ver)
|
||||
decrypted, decErr := crypto.DecryptVersioned(v, ver)
|
||||
if decErr != nil {
|
||||
return nil, fmt.Sprintf("cannot decrypt workspace secret %s: %v", k, decErr)
|
||||
}
|
||||
envVars[k] = string(decrypted)
|
||||
}
|
||||
}
|
||||
}
|
||||
return envVars, ""
|
||||
}
|
||||
|
||||
// provisionWorkspaceCP provisions a workspace via the control plane API.
|
||||
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
||||
defer cancel()
|
||||
|
||||
envVars, decryptErr := loadWorkspaceSecrets(ctx, workspaceID)
|
||||
if decryptErr != "" {
|
||||
log.Printf("CPProvisioner: %s for %s", decryptErr, workspaceID)
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
|
||||
workspaceID, decryptErr)
|
||||
return
|
||||
}
|
||||
|
||||
applyAgentGitIdentity(envVars, payload.Name)
|
||||
if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil {
|
||||
|
||||
@ -1,292 +0,0 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// FlyRuntimeImages maps runtime names to their GHCR image tags for Fly.
|
||||
// These are the same images as RuntimeImages but use the full registry path
|
||||
// since Fly machines pull from a registry, not a local Docker daemon.
|
||||
var FlyRuntimeImages = map[string]string{
|
||||
"langgraph": "ghcr.io/molecule-ai/workspace-langgraph:latest",
|
||||
"claude-code": "ghcr.io/molecule-ai/workspace-claude-code:latest",
|
||||
"openclaw": "ghcr.io/molecule-ai/workspace-openclaw:latest",
|
||||
"deepagents": "ghcr.io/molecule-ai/workspace-deepagents:latest",
|
||||
"crewai": "ghcr.io/molecule-ai/workspace-crewai:latest",
|
||||
"autogen": "ghcr.io/molecule-ai/workspace-autogen:latest",
|
||||
"hermes": "ghcr.io/molecule-ai/workspace-hermes:latest",
|
||||
"gemini-cli": "ghcr.io/molecule-ai/workspace-gemini-cli:latest",
|
||||
}
|
||||
|
||||
const (
|
||||
flyAPIBase = "https://api.machines.dev/v1"
|
||||
flyDefaultSize = "shared-cpu-1x"
|
||||
)
|
||||
|
||||
// FlyProvisioner provisions workspace agents as Fly Machines instead of
|
||||
// local Docker containers. Used on SaaS tenants where no Docker daemon
|
||||
// is available. Set CONTAINER_BACKEND=flyio to activate.
|
||||
type FlyProvisioner struct {
|
||||
token string // FLY_API_TOKEN
|
||||
appID string // Fly app to create machines in (FLY_APP)
|
||||
region string // Fly region (FLY_REGION, default "ord")
|
||||
}
|
||||
|
||||
// NewFlyProvisioner creates a provisioner that manages workspaces as Fly Machines.
|
||||
func NewFlyProvisioner() (*FlyProvisioner, error) {
|
||||
token := os.Getenv("FLY_API_TOKEN")
|
||||
if token == "" {
|
||||
return nil, fmt.Errorf("FLY_API_TOKEN required for Fly provisioner")
|
||||
}
|
||||
appID := os.Getenv("FLY_WORKSPACE_APP")
|
||||
if appID == "" {
|
||||
return nil, fmt.Errorf("FLY_WORKSPACE_APP required (Fly app for workspace machines)")
|
||||
}
|
||||
region := os.Getenv("FLY_REGION")
|
||||
if region == "" {
|
||||
region = "ord"
|
||||
}
|
||||
return &FlyProvisioner{token: token, appID: appID, region: region}, nil
|
||||
}
|
||||
|
||||
// flyMachineRequest is the payload for POST /apps/:app/machines.
|
||||
type flyMachineRequest struct {
|
||||
Name string `json:"name"`
|
||||
Region string `json:"region"`
|
||||
Config flyMachineConfig `json:"config"`
|
||||
}
|
||||
|
||||
type flyMachineConfig struct {
|
||||
Image string `json:"image"`
|
||||
Env map[string]string `json:"env"`
|
||||
Services []flyService `json:"services,omitempty"`
|
||||
Guest *flyGuest `json:"guest,omitempty"`
|
||||
}
|
||||
|
||||
type flyService struct {
|
||||
Ports []flyPort `json:"ports"`
|
||||
Protocol string `json:"protocol"`
|
||||
InternalPort int `json:"internal_port"`
|
||||
}
|
||||
|
||||
type flyPort struct {
|
||||
Port int `json:"port"`
|
||||
Handlers []string `json:"handlers"`
|
||||
}
|
||||
|
||||
type flyGuest struct {
|
||||
CPUKind string `json:"cpu_kind"`
|
||||
CPUs int `json:"cpus"`
|
||||
MemoryMB int `json:"memory_mb"`
|
||||
}
|
||||
|
||||
type flyMachineResponse struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
State string `json:"state"`
|
||||
InstanceID string `json:"instance_id"`
|
||||
PrivateIP string `json:"private_ip"`
|
||||
}
|
||||
|
||||
// Start creates and starts a Fly Machine for the workspace.
|
||||
func (p *FlyProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, error) {
|
||||
image := FlyRuntimeImages[cfg.Runtime]
|
||||
if image == "" {
|
||||
image = FlyRuntimeImages["langgraph"]
|
||||
}
|
||||
|
||||
name := ContainerName(cfg.WorkspaceID)
|
||||
|
||||
env := map[string]string{
|
||||
"WORKSPACE_ID": cfg.WorkspaceID,
|
||||
"PLATFORM_URL": cfg.PlatformURL,
|
||||
"PORT": DefaultPort,
|
||||
}
|
||||
if cfg.AwarenessURL != "" {
|
||||
env["AWARENESS_URL"] = cfg.AwarenessURL
|
||||
}
|
||||
if cfg.AwarenessNamespace != "" {
|
||||
env["AWARENESS_NAMESPACE"] = cfg.AwarenessNamespace
|
||||
}
|
||||
// Merge additional env vars (API keys, secrets)
|
||||
for k, v := range cfg.EnvVars {
|
||||
env[k] = v
|
||||
}
|
||||
|
||||
memMB := 512
|
||||
cpus := 1
|
||||
switch cfg.Tier {
|
||||
case 3:
|
||||
memMB = 2048
|
||||
cpus = 2
|
||||
case 4:
|
||||
memMB = 4096
|
||||
cpus = 4
|
||||
}
|
||||
|
||||
req := flyMachineRequest{
|
||||
Name: name,
|
||||
Region: p.region,
|
||||
Config: flyMachineConfig{
|
||||
Image: image,
|
||||
Env: env,
|
||||
Services: []flyService{
|
||||
{
|
||||
InternalPort: 8000,
|
||||
Protocol: "tcp",
|
||||
Ports: []flyPort{
|
||||
{Port: 443, Handlers: []string{"tls", "http"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
Guest: &flyGuest{
|
||||
CPUKind: "shared",
|
||||
CPUs: cpus,
|
||||
MemoryMB: memMB,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("fly: marshal request: %w", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID)
|
||||
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("fly: create request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Authorization", "Bearer "+p.token)
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("fly: send request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
return "", fmt.Errorf("fly: create machine failed (%d): %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
var machine flyMachineResponse
|
||||
if err := json.Unmarshal(respBody, &machine); err != nil {
|
||||
return "", fmt.Errorf("fly: parse response: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("Fly provisioner: created machine %s (%s) for workspace %s in %s",
|
||||
machine.ID, machine.Name, cfg.WorkspaceID, p.region)
|
||||
|
||||
return machine.ID, nil
|
||||
}
|
||||
|
||||
// Stop destroys the Fly Machine for a workspace.
|
||||
func (p *FlyProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
machineID, err := p.findMachine(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if machineID == "" {
|
||||
return nil // already gone
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/apps/%s/machines/%s?force=true", flyAPIBase, p.appID, machineID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
|
||||
req.Header.Set("Authorization", "Bearer "+p.token)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fly: delete machine: %w", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
log.Printf("Fly provisioner: deleted machine %s for workspace %s", machineID, workspaceID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning checks if the workspace's Fly Machine is in "started" state.
|
||||
func (p *FlyProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
|
||||
machineID, err := p.findMachine(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if machineID == "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/apps/%s/machines/%s", flyAPIBase, p.appID, machineID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
req.Header.Set("Authorization", "Bearer "+p.token)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var machine flyMachineResponse
|
||||
json.NewDecoder(resp.Body).Decode(&machine)
|
||||
return machine.State == "started", nil
|
||||
}
|
||||
|
||||
// Restart stops and re-creates the machine.
|
||||
func (p *FlyProvisioner) Restart(ctx context.Context, workspaceID string, cfg WorkspaceConfig) error {
|
||||
machineID, err := p.findMachine(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if machineID != "" {
|
||||
// Restart existing machine
|
||||
url := fmt.Sprintf("%s/apps/%s/machines/%s/restart", flyAPIBase, p.appID, machineID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "POST", url, nil)
|
||||
req.Header.Set("Authorization", "Bearer "+p.token)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fly: restart machine: %w", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
log.Printf("Fly provisioner: restarted machine %s for workspace %s", machineID, workspaceID)
|
||||
return nil
|
||||
}
|
||||
// Machine doesn't exist — create it
|
||||
_, err = p.Start(ctx, cfg)
|
||||
return err
|
||||
}
|
||||
|
||||
// findMachine looks up the Fly Machine for a workspace by name.
|
||||
func (p *FlyProvisioner) findMachine(ctx context.Context, workspaceID string) (string, error) {
|
||||
name := ContainerName(workspaceID)
|
||||
url := fmt.Sprintf("%s/apps/%s/machines", flyAPIBase, p.appID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
req.Header.Set("Authorization", "Bearer "+p.token)
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("fly: list machines: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var machines []flyMachineResponse
|
||||
json.NewDecoder(resp.Body).Decode(&machines)
|
||||
|
||||
for _, m := range machines {
|
||||
if m.Name == name {
|
||||
return m.ID, nil
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Close is a no-op for the Fly provisioner (no persistent connections).
|
||||
func (p *FlyProvisioner) Close() error { return nil }
|
||||
Loading…
Reference in New Issue
Block a user