Merge pull request #986 from Molecule-AI/feat/tenant-cp-env-refresh

feat(ws-server): pull env from CP on startup
This commit is contained in:
Hongming Wang 2026-04-19 03:27:14 -07:00 committed by GitHub
commit de2a4cb50e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 219 additions and 1 deletions

View File

@ -1 +1,2 @@
server
# The compiled binary, not the cmd/server package.
/server

View File

@ -0,0 +1,107 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
)
// refreshEnvFromCP pulls the tenant's current config-plane env vars
// from the control plane and applies them via os.Setenv BEFORE any
// other code calls os.Getenv on them.
//
// Why:
// - user-data on the tenant EC2 bakes env vars into `docker run` at
// provision time. Those values are frozen. When we rotate a secret
// on CP (e.g. PROVISION_SHARED_SECRET) there's no way to push the
// new value into already-provisioned tenants.
// - the Docker image auto-updater already pulls the latest workspace-
// server image every 5 min. If THAT image knows how to refresh its
// own env from the CP on startup, every tenant heals itself within
// the update cycle — no ssh, no re-provision, no ops toil.
//
// Contract (paired with cp-side GET /cp/tenants/config):
// Request: GET {MOLECULE_CP_URL or https://api.moleculesai.app}/cp/tenants/config
// Authorization: Bearer <ADMIN_TOKEN>
// X-Molecule-Org-Id: <MOLECULE_ORG_ID>
// Response: 200 {"MOLECULE_CP_SHARED_SECRET":"…","MOLECULE_CP_URL":"…", …}
// 401 on bearer mismatch or unknown org
//
// Best-effort: any failure logs and returns — main() keeps booting.
// Self-hosted deploys without MOLECULE_ORG_ID or ADMIN_TOKEN set
// short-circuit silently so this function is a no-op there.
func refreshEnvFromCP() error {
orgID := os.Getenv("MOLECULE_ORG_ID")
adminToken := os.Getenv("ADMIN_TOKEN")
if orgID == "" || adminToken == "" {
// Not a SaaS tenant (self-hosted dev or not yet provisioned).
return nil
}
base := os.Getenv("MOLECULE_CP_URL")
if base == "" {
// Default to prod for any tenant that lost track of its CP URL
// (e.g. older user-data that only set MOLECULE_ORG_ID).
base = "https://api.moleculesai.app"
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", base+"/cp/tenants/config", nil)
if err != nil {
return fmt.Errorf("build request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+adminToken)
req.Header.Set("X-Molecule-Org-Id", orgID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()
// 64 KiB cap — the CP only returns small JSON blobs here. An
// unbounded read would be weaponizable if a compromised upstream
// ever echoed back a gigabyte.
body, err := io.ReadAll(io.LimitReader(resp.Body, 64<<10))
if err != nil {
return fmt.Errorf("read body: %w", err)
}
if resp.StatusCode != http.StatusOK {
// 401 on first boot-after-restart is expected for tenants still
// running under old user-data where admin_token on-disk hasn't
// had its corresponding row seeded. Don't treat as fatal — just
// log so operators can spot repeat offenders in logs.
return fmt.Errorf("cp returned %d", resp.StatusCode)
}
var cfg map[string]string
if err := json.Unmarshal(body, &cfg); err != nil {
return fmt.Errorf("decode: %w", err)
}
// Apply only strings; reject oversized values defensively. An
// operator-supplied config should never exceed 4 KiB per key —
// workspace-server env vars are URLs, hex secrets, short identifiers.
const maxValueBytes = 4 << 10
applied := 0
for k, v := range cfg {
if k == "" || len(v) > maxValueBytes {
continue
}
if err := os.Setenv(k, v); err != nil {
log.Printf("CP env refresh: setenv %s: %v", k, err)
continue
}
applied++
}
log.Printf("CP env refresh: applied %d values from %s/cp/tenants/config", applied, base)
return nil
}

View File

@ -0,0 +1,100 @@
package main
import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
)
// TestRefreshEnvFromCP_NoopWhenNotSaaS: without MOLECULE_ORG_ID or
// ADMIN_TOKEN, the function short-circuits silently — self-hosted dev
// must not fail or log spam here.
func TestRefreshEnvFromCP_NoopWhenNotSaaS(t *testing.T) {
t.Setenv("MOLECULE_ORG_ID", "")
t.Setenv("ADMIN_TOKEN", "")
if err := refreshEnvFromCP(); err != nil {
t.Errorf("expected nil on non-SaaS, got %v", err)
}
}
// TestRefreshEnvFromCP_AppliesCPResponse: wire a stub CP, run refresh,
// confirm the returned env vars ended up in os.Environ().
func TestRefreshEnvFromCP_AppliesCPResponse(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got := r.Header.Get("Authorization"); got != "Bearer tenant-admin-token" {
t.Errorf("bearer: got %q", got)
}
if got := r.Header.Get("X-Molecule-Org-Id"); got != "org-abc" {
t.Errorf("org id header: got %q", got)
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"MOLECULE_CP_SHARED_SECRET":"new-secret","MOLECULE_CP_URL":"https://api.moleculesai.app"}`)
}))
defer srv.Close()
t.Setenv("MOLECULE_ORG_ID", "org-abc")
t.Setenv("ADMIN_TOKEN", "tenant-admin-token")
t.Setenv("MOLECULE_CP_URL", srv.URL)
t.Setenv("MOLECULE_CP_SHARED_SECRET", "") // clear before refresh
if err := refreshEnvFromCP(); err != nil {
t.Fatalf("refreshEnvFromCP: %v", err)
}
if got := os.Getenv("MOLECULE_CP_SHARED_SECRET"); got != "new-secret" {
t.Errorf("SHARED_SECRET: want new-secret, got %q", got)
}
}
// TestRefreshEnvFromCP_CPUnreachableDoesNotFailBoot: network errors must
// return non-nil BUT main.go treats that as warn-and-continue. We assert
// the function returns an error (not a panic) so the caller can log.
func TestRefreshEnvFromCP_CPUnreachableDoesNotFailBoot(t *testing.T) {
t.Setenv("MOLECULE_ORG_ID", "org-abc")
t.Setenv("ADMIN_TOKEN", "t")
t.Setenv("MOLECULE_CP_URL", "http://127.0.0.1:1") // closed port
err := refreshEnvFromCP()
if err == nil {
t.Error("expected an error when CP is unreachable")
}
}
// TestRefreshEnvFromCP_NonOKPropagates: CP returns 500 → error.
func TestRefreshEnvFromCP_NonOKPropagates(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "boom", http.StatusInternalServerError)
}))
defer srv.Close()
t.Setenv("MOLECULE_ORG_ID", "org-abc")
t.Setenv("ADMIN_TOKEN", "t")
t.Setenv("MOLECULE_CP_URL", srv.URL)
if err := refreshEnvFromCP(); err == nil {
t.Error("expected error on 500, got nil")
}
}
// TestRefreshEnvFromCP_RejectsOversizedValue: a single-value-over-4KiB
// payload must NOT poison the environment.
func TestRefreshEnvFromCP_RejectsOversizedValue(t *testing.T) {
giant := make([]byte, 5<<10)
for i := range giant {
giant[i] = 'x'
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"MOLECULE_CP_SHARED_SECRET":%q}`, string(giant))
}))
defer srv.Close()
t.Setenv("MOLECULE_ORG_ID", "org-abc")
t.Setenv("ADMIN_TOKEN", "t")
t.Setenv("MOLECULE_CP_URL", srv.URL)
t.Setenv("MOLECULE_CP_SHARED_SECRET", "original")
if err := refreshEnvFromCP(); err != nil {
t.Fatalf("refreshEnvFromCP: %v", err)
}
if got := os.Getenv("MOLECULE_CP_SHARED_SECRET"); got != "original" {
t.Errorf("oversized value was applied — want %q, got %d bytes",
"original", len(got))
}
}

View File

@ -30,6 +30,16 @@ import (
)
func main() {
// CP self-refresh: pull any operator-rotated config (e.g. a new
// MOLECULE_CP_SHARED_SECRET) before any other code reads env.
// Best-effort — if the CP is unreachable we keep booting with the
// env we were provisioned with. Older SaaS tenants predate PR #53
// and can arrive here with MOLECULE_CP_SHARED_SECRET unset; this
// is how they heal without SSH.
if err := refreshEnvFromCP(); err != nil {
log.Printf("CP env refresh: %v (continuing with baked-in env)", err)
}
// Secrets encryption. In MOLECULE_ENV=prod, boot refuses to start
// without a valid SECRETS_ENCRYPTION_KEY (fail-secure — Top-5 #5).
// In any other environment, missing keys just log a warning and