Merge pull request #1067 from Molecule-AI/fix/tenant-workspace-auth

fix(workspace-server): send X-Molecule-Admin-Token on CP calls
This commit is contained in:
Hongming Wang 2026-04-20 08:39:49 -07:00 committed by GitHub
commit 2730a20194
2 changed files with 238 additions and 22 deletions

View File

@ -20,7 +20,8 @@ import (
type CPProvisioner struct {
baseURL string
orgID string
sharedSecret string // bearer passed to CP's /cp/workspaces/* gate
sharedSecret string // Authorization: Bearer — platform-wide gate
adminToken string // X-Molecule-Admin-Token — per-tenant identity (controlplane #118/#130)
httpClient *http.Client
}
@ -40,31 +41,48 @@ func NewCPProvisioner() (*CPProvisioner, error) {
baseURL = "https://api.moleculesai.app"
}
// CP gates /cp/workspaces/* behind a bearer check (C1). Without the
// shared secret the CP returns 401 — or 404 if the routes refused
// to mount on its side. Tenant operators should set this on the
// tenant env to the same value as the CP's PROVISION_SHARED_SECRET.
// CP gates /cp/workspaces/* behind two credentials now:
// 1. Shared secret (Authorization: Bearer) — gates the route at
// the router level, proves the caller is a tenant platform.
// 2. Admin token (X-Molecule-Admin-Token) — proves WHICH tenant.
// Introduced in controlplane #118/#130 to prevent cross-tenant
// provisioning when the shared secret leaks from one tenant.
sharedSecret := os.Getenv("MOLECULE_CP_SHARED_SECRET")
if sharedSecret == "" {
// Fall back to PROVISION_SHARED_SECRET so a single env-var name
// works on both sides of the wire.
sharedSecret = os.Getenv("PROVISION_SHARED_SECRET")
}
// ADMIN_TOKEN is injected into the tenant container at provision
// time by the control plane (see provisioner/ec2.go Secrets Manager
// bootstrap path). Without it, post-#118 CP rejects every
// /cp/workspaces/* call with 401.
adminToken := os.Getenv("ADMIN_TOKEN")
return &CPProvisioner{
baseURL: baseURL,
orgID: orgID,
sharedSecret: sharedSecret,
adminToken: adminToken,
httpClient: &http.Client{Timeout: 120 * time.Second},
}, nil
}
// authHeader sets Authorization: Bearer on the outbound request. No-op
// when sharedSecret is empty so self-hosted / dev deployments still work.
func (p *CPProvisioner) authHeader(req *http.Request) {
// authHeaders sets both auth headers on the outbound request:
// - Authorization: Bearer <shared secret> — platform gate
// - X-Molecule-Admin-Token: <per-tenant token> — identity gate
//
// Either is a no-op when its value is empty so self-hosted / dev
// deployments without a real CP still work (those don't hit a CP that
// enforces either gate). In prod both are set by the controlplane
// bootstrap, so both headers land on every outbound call.
func (p *CPProvisioner) authHeaders(req *http.Request) {
if p.sharedSecret != "" {
req.Header.Set("Authorization", "Bearer "+p.sharedSecret)
}
if p.adminToken != "" {
req.Header.Set("X-Molecule-Admin-Token", p.adminToken)
}
}
type cpProvisionRequest struct {
@ -105,7 +123,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
return "", fmt.Errorf("cp provisioner: create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
p.authHeader(httpReq)
p.authHeaders(httpReq)
resp, err := p.httpClient.Do(httpReq)
if err != nil {
@ -140,7 +158,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
p.authHeader(req)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return fmt.Errorf("cp provisioner: stop: %w", err)
@ -153,7 +171,7 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
p.authHeader(req)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return false, err

View File

@ -8,6 +8,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)
// TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't
@ -39,27 +40,50 @@ func TestNewCPProvisioner_FallsBackToProvisionSharedSecret(t *testing.T) {
}
}
// TestAuthHeader_NoopWhenSecretEmpty — the self-hosted path that
// doesn't gate /cp/workspaces/* must not add a stray Authorization
// header (bearer-like content would be surprising to non-bearer
// intermediaries).
func TestAuthHeader_NoopWhenSecretEmpty(t *testing.T) {
p := &CPProvisioner{sharedSecret: ""}
// TestAuthHeaders_NoopWhenBothEmpty — the self-hosted path that
// doesn't gate /cp/workspaces/* must not add stray auth headers
// (bearer-like content would surprise non-bearer intermediaries).
func TestAuthHeaders_NoopWhenBothEmpty(t *testing.T) {
p := &CPProvisioner{sharedSecret: "", adminToken: ""}
req := httptest.NewRequest("GET", "http://x/", nil)
p.authHeader(req)
p.authHeaders(req)
if got := req.Header.Get("Authorization"); got != "" {
t.Errorf("Authorization set to %q with empty secret; want unset", got)
}
if got := req.Header.Get("X-Molecule-Admin-Token"); got != "" {
t.Errorf("X-Molecule-Admin-Token set to %q with empty token; want unset", got)
}
}
// TestAuthHeader_SetsBearerWhenSecretSet — happy path.
func TestAuthHeader_SetsBearerWhenSecretSet(t *testing.T) {
p := &CPProvisioner{sharedSecret: "the-secret"}
// TestAuthHeaders_SetsBothWhenBothProvided — happy path for SaaS
// tenants. Both the platform-wide shared secret and the per-tenant
// admin_token land on every outbound call.
func TestAuthHeaders_SetsBothWhenBothProvided(t *testing.T) {
p := &CPProvisioner{sharedSecret: "the-secret", adminToken: "tok-abc"}
req := httptest.NewRequest("GET", "http://x/", nil)
p.authHeader(req)
p.authHeaders(req)
if got := req.Header.Get("Authorization"); got != "Bearer the-secret" {
t.Errorf("Authorization = %q, want %q", got, "Bearer the-secret")
}
if got := req.Header.Get("X-Molecule-Admin-Token"); got != "tok-abc" {
t.Errorf("X-Molecule-Admin-Token = %q, want tok-abc", got)
}
}
// TestAuthHeaders_OnlyAdminTokenWhenSecretEmpty — in the transition
// window where the tenant has admin_token but PROVISION_SHARED_SECRET
// isn't set, still send the admin token. CP middleware decides whether
// the shared secret is required.
func TestAuthHeaders_OnlyAdminTokenWhenSecretEmpty(t *testing.T) {
p := &CPProvisioner{sharedSecret: "", adminToken: "tok-abc"}
req := httptest.NewRequest("GET", "http://x/", nil)
p.authHeaders(req)
if got := req.Header.Get("Authorization"); got != "" {
t.Errorf("Authorization = %q, want unset", got)
}
if got := req.Header.Get("X-Molecule-Admin-Token"); got != "tok-abc" {
t.Errorf("X-Molecule-Admin-Token = %q, want tok-abc", got)
}
}
// TestStart_HappyPath — Start posts to the stubbed CP, passes the
@ -148,3 +172,177 @@ func TestStart_NoStructuredErrorFallsBackToSize(t *testing.T) {
t.Errorf("expected byte-count fallback, got %q", err.Error())
}
}
// TestStart_TransportFailureSurfaces — the CP isn't reachable at all
// (DNS fails, TCP refused, TLS handshake error). Start must return an
// error tagged with enough context to find the failed call in logs
// without leaking credentials.
func TestStart_TransportFailureSurfaces(t *testing.T) {
// Port 1 is reserved by IANA; connect attempts fail immediately.
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
_, err := p.Start(context.Background(), WorkspaceConfig{WorkspaceID: "ws-1", Runtime: "py"})
if err == nil {
t.Fatal("expected transport error, got nil")
}
if !strings.Contains(err.Error(), "cp provisioner: send") {
t.Errorf("error should be tagged cp provisioner: send, got %q", err.Error())
}
}
// TestStop_SendsBothAuthHeaders — verify #118/#130 compliance on the
// teardown path. Any call to /cp/workspaces/:id must carry both the
// platform-wide shared secret AND the per-tenant admin token, or the
// CP will 401.
func TestStop_SendsBothAuthHeaders(t *testing.T) {
var sawBearer, sawAdminToken, sawMethod, sawPath string
var sawInstance string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sawBearer = r.Header.Get("Authorization")
sawAdminToken = r.Header.Get("X-Molecule-Admin-Token")
sawMethod = r.Method
sawPath = r.URL.Path
sawInstance = r.URL.Query().Get("instance_id")
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
}
if err := p.Stop(context.Background(), "ws-1"); err != nil {
t.Fatalf("Stop: %v", err)
}
if sawMethod != "DELETE" {
t.Errorf("method = %q, want DELETE", sawMethod)
}
if sawPath != "/cp/workspaces/ws-1" {
t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath)
}
if sawInstance != "ws-1" {
t.Errorf("instance_id query = %q, want ws-1", sawInstance)
}
if sawBearer != "Bearer s3cret" {
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
}
if sawAdminToken != "tok-xyz" {
t.Errorf("admin token = %q, want tok-xyz", sawAdminToken)
}
}
// TestStop_TransportErrorSurfaces — same treatment as Start. If the
// teardown call hits a dead CP, the error must surface so the caller
// knows the workspace might still be running and needs retry.
func TestStop_TransportErrorSurfaces(t *testing.T) {
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
err := p.Stop(context.Background(), "ws-1")
if err == nil {
t.Fatal("expected transport error, got nil")
}
if !strings.Contains(err.Error(), "cp provisioner: stop") {
t.Errorf("error should be tagged, got %q", err.Error())
}
}
// TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose
// a bool ("running"/"pending"/"terminated" → true only for "running").
func TestIsRunning_ParsesStateField(t *testing.T) {
cases := map[string]bool{
"running": true,
"pending": false,
"stopping": false,
"terminated": false,
}
for state, want := range cases {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/cp/workspaces/ws-1/status" {
t.Errorf("path = %q", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, `{"state":"`+state+`"}`)
}))
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
}
got, err := p.IsRunning(context.Background(), "ws-1")
srv.Close()
if err != nil {
t.Errorf("state=%s: IsRunning error %v", state, err)
continue
}
if got != want {
t.Errorf("state=%s: got %v, want %v", state, got, want)
}
}
}
// TestIsRunning_SendsBothAuthHeaders — parity with Stop. Status reads
// require the same per-tenant auth because they leak public_ip +
// private_ip to the caller.
func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
var sawBearer, sawAdminToken string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sawBearer = r.Header.Get("Authorization")
sawAdminToken = r.Header.Get("X-Molecule-Admin-Token")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, `{"state":"running"}`)
}))
defer srv.Close()
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
}
_, _ = p.IsRunning(context.Background(), "ws-1")
if sawBearer != "Bearer s3cret" {
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
}
if sawAdminToken != "tok-xyz" {
t.Errorf("admin token = %q, want tok-xyz", sawAdminToken)
}
}
// TestIsRunning_TransportErrorReturnsFalse — when the CP is
// unreachable, IsRunning must not claim the workspace is running
// (that'd mislead the sweeper into leaving a dead row in place).
func TestIsRunning_TransportErrorReturnsFalse(t *testing.T) {
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
got, err := p.IsRunning(context.Background(), "ws-1")
if err == nil {
t.Errorf("expected transport error, got nil (got=%v)", got)
}
if got {
t.Errorf("transport failure must not report running=true")
}
}
// TestClose_Noop — explicit contract: Close has no side effects and
// no error. Exists for the Provisioner interface; compliance guard.
func TestClose_Noop(t *testing.T) {
p := &CPProvisioner{}
if err := p.Close(); err != nil {
t.Errorf("Close should return nil, got %v", err)
}
}