Merge remote-tracking branch 'origin/staging' into fix/coverage-gate-platform-go-1823

This commit is contained in:
rabbitblood 2026-04-23 11:26:22 -07:00
commit 1a084426da
5 changed files with 215 additions and 34 deletions

View File

@ -3,6 +3,7 @@ package provisioner
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
@ -10,6 +11,8 @@ import (
"net/http"
"os"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// CPProvisioner provisions workspace agents by calling the control plane's
@ -182,8 +185,26 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
}
// Stop terminates the workspace's EC2 instance via the control plane.
//
// Looks up the actual EC2 instance_id from the workspaces table before
// calling CP — earlier versions passed workspaceID (a UUID) as the
// instance_id query param, which CP forwarded to EC2 TerminateInstances,
// which rejected with InvalidInstanceID.Malformed (EC2 IDs are i-… not
// UUIDs). The terminate failure then left the workspace's SG attached,
// blocking the next provision with InvalidGroup.Duplicate — a full
// "Save & Restart" crash on SaaS.
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID)
instanceID, err := resolveInstanceID(ctx, workspaceID)
if err != nil {
return fmt.Errorf("cp provisioner: stop: resolve instance_id: %w", err)
}
if instanceID == "" {
// No instance was ever provisioned (or already deprovisioned and
// the column was cleared). Nothing to terminate — idempotent.
log.Printf("CP provisioner: Stop for %s — no instance_id on file, nothing to do", workspaceID)
return nil
}
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, instanceID)
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
p.provisionAuthHeaders(req)
resp, err := p.httpClient.Do(req)
@ -194,6 +215,35 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
return nil
}
// resolveInstanceID reads workspaces.instance_id for the given workspace.
// Returns ("", nil) when the row exists but has no instance_id recorded
// (edge case for external workspaces or stale rows). Returns an error
// only on real DB failures, not on missing rows — callers (Stop,
// IsRunning) treat the empty string as "nothing to act on."
//
// Exposed as a package var so tests can substitute a stub without
// standing up a sqlmock just to unblock the Stop/IsRunning code path.
// Production code never reassigns it.
var resolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
if db.DB == nil {
// Defensive: NewCPProvisioner never runs without db.DB being
// set in main(). If somehow nil, treat as "no instance" rather
// than panicking in the Stop/IsRunning path.
return "", nil
}
var instanceID sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&instanceID)
if err != nil && err != sql.ErrNoRows {
return "", err
}
if !instanceID.Valid {
return "", nil
}
return instanceID.String, nil
}
// IsRunning checks workspace EC2 instance state via the control plane.
//
// Contract (matches the Docker Provisioner.IsRunning contract —
@ -219,7 +269,18 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
// Both callers are happy with (true, err); callers that need the
// previous (false, err) shape must inspect err themselves.
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID)
instanceID, err := resolveInstanceID(ctx, workspaceID)
if err != nil {
// Treat DB errors the same as transport errors — (true, err) keeps
// a2a_proxy on the alive path and logs the signal.
return true, fmt.Errorf("cp provisioner: status: resolve instance_id: %w", err)
}
if instanceID == "" {
// No instance recorded. Report "not running" cleanly (no error)
// so restart cascades can trigger a fresh provision.
return false, nil
}
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, instanceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
p.provisionAuthHeaders(req)
resp, err := p.httpClient.Do(req)

View File

@ -11,6 +11,23 @@ import (
"time"
)
// primeInstanceIDLookup swaps resolveInstanceID for a stub that returns
// the mapped instance_id for the given workspace_id, or "" for anything
// not in the map. Cheaper than standing up a sqlmock since Stop/IsRunning
// tests mostly don't care about the SQL path — they're testing the CP
// HTTP interaction downstream of the lookup.
func primeInstanceIDLookup(t *testing.T, pairs map[string]string) {
t.Helper()
prev := resolveInstanceID
resolveInstanceID = func(_ context.Context, wsID string) (string, error) {
if id, ok := pairs[wsID]; ok {
return id, nil
}
return "", nil
}
t.Cleanup(func() { resolveInstanceID = prev })
}
// TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't
// have a MOLECULE_ORG_ID, and the provisioner must refuse to construct
// rather than silently phone home to the prod CP with an empty tenant.
@ -267,6 +284,12 @@ func TestStart_TransportFailureSurfaces(t *testing.T) {
// platform-wide shared secret AND the per-tenant admin token, or the
// CP will 401.
func TestStop_SendsBothAuthHeaders(t *testing.T) {
// resolveInstanceID looks up the real EC2 id from the workspaces
// table; previously this test passed when the tenant buggily
// reused workspaceID AS instance_id. Now we assert the correct
// EC2 id round-trips.
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"})
var sawBearer, sawAdminToken, sawMethod, sawPath string
var sawInstance string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -295,8 +318,8 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) {
if sawPath != "/cp/workspaces/ws-1" {
t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath)
}
if sawInstance != "ws-1" {
t.Errorf("instance_id query = %q, want ws-1", sawInstance)
if sawInstance != "i-abc123" {
t.Errorf("instance_id query = %q, want i-abc123 (from DB lookup, NOT the workspace UUID)", sawInstance)
}
if sawBearer != "Bearer s3cret" {
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
@ -310,6 +333,7 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) {
// teardown call hits a dead CP, the error must surface so the caller
// knows the workspace might still be running and needs retry.
func TestStop_TransportErrorSurfaces(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"})
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
@ -327,6 +351,7 @@ func TestStop_TransportErrorSurfaces(t *testing.T) {
// TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose
// a bool ("running"/"pending"/"terminated" → true only for "running").
func TestIsRunning_ParsesStateField(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-parsesstatefield"})
cases := map[string]bool{
"running": true,
"pending": false,
@ -364,6 +389,7 @@ func TestIsRunning_ParsesStateField(t *testing.T) {
// require the same per-tenant auth because they leak public_ip +
// private_ip to the caller.
func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-auth-headers"})
var sawBearer, sawAdminToken string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sawBearer = r.Header.Get("Authorization")
@ -398,6 +424,7 @@ func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
// The sweeper (healthsweep.go) inspects err independently and skips
// on any error, so (true, err) is equally safe for that caller.
func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-transport"})
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
@ -418,6 +445,7 @@ func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
// the sweeper would see the workspace as not-running. Now every
// non-2xx is an error the caller can log + retry.
func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-non2xx"})
cases := []struct {
name string
status int
@ -459,6 +487,7 @@ func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
// a middleware glitch (HTML error page with 200) from looking like
// "workspace stopped".
func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-malformed"})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, _ = io.WriteString(w, "<html>maintenance mode</html>")
@ -486,6 +515,7 @@ func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
// the documented contract values rather than simulating the whole
// a2a_proxy flow.
func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-contract"})
// Simulate every error path and assert running==true for each.
t.Run("transport error", func(t *testing.T) {
p := &CPProvisioner{
@ -559,6 +589,7 @@ func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
// only needs the prefix to produce a value, so the decode succeeds —
// and the LimitReader enforces the cap regardless.
func TestIsRunning_BoundedBodyRead(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-bounded"})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
// Valid JSON prefix, then pad well past the 64 KiB cap.
@ -628,3 +659,67 @@ func TestGetConsoleOutput_UsesAdminBearer(t *testing.T) {
t.Errorf("bearer = %q, want Bearer admin-api-key (NOT the provision secret)", sawBearer)
}
}
// TestStop_EmptyInstanceIDIsNoop — when workspaces.instance_id is NULL
// (e.g. a row that was never provisioned, or deprovisioned and cleared),
// Stop should be a no-op instead of sending a malformed CP request.
// Regression guard: previously Stop sent workspaceID as instance_id
// even when no EC2 had been booked, causing CP → EC2 to 400.
func TestStop_EmptyInstanceIDIsNoop(t *testing.T) {
// Empty map → lookup returns ("", nil) for any workspace.
primeInstanceIDLookup(t, map[string]string{})
hit := false
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hit = true
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
if err := p.Stop(context.Background(), "ws-ghost"); err != nil {
t.Fatalf("Stop with empty instance_id should no-op, got err %v", err)
}
if hit {
t.Errorf("Stop contacted CP even though instance_id was empty")
}
}
// TestIsRunning_UsesDBInstanceID — IsRunning must also look up
// instance_id from the workspaces table, same as Stop. Mirror of
// TestStop_SendsBothAuthHeaders.
func TestIsRunning_UsesDBInstanceID(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-xyz789"})
var sawInstance string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sawInstance = r.URL.Query().Get("instance_id")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, `{"state":"running"}`)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
if _, err := p.IsRunning(context.Background(), "ws-1"); err != nil {
t.Fatalf("IsRunning: %v", err)
}
if sawInstance != "i-xyz789" {
t.Errorf("instance_id query = %q, want i-xyz789 (from DB lookup)", sawInstance)
}
}
// TestIsRunning_EmptyInstanceIDReturnsFalse — IsRunning on a
// workspace with no recorded EC2 instance must report (false, nil) so
// restart cascades re-provision fresh instead of looping on a stale
// row with no backing instance.
func TestIsRunning_EmptyInstanceIDReturnsFalse(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{})
p := &CPProvisioner{baseURL: "http://unused", orgID: "org-1"}
running, err := p.IsRunning(context.Background(), "ws-ghost")
if err != nil {
t.Errorf("IsRunning with empty instance_id should return (false, nil), got err %v", err)
}
if running {
t.Errorf("IsRunning with empty instance_id should return running=false, got true")
}
}

View File

@ -71,11 +71,20 @@ fi
# Now running as agent (uid 1000)
# --- Start background token refresh daemon ---
# --- Start background token refresh daemon (with respawn supervision) ---
# Keeps gh CLI and git credentials fresh across the 60-min token TTL.
# Runs in the background; entrypoint continues to exec molecule-runtime.
# Wrapped in a respawn loop so a daemon crash doesn't silently leave the
# workspace stuck on an expired token. Runs in the background; entrypoint
# continues to exec molecule-runtime.
if [ -x /app/scripts/molecule-gh-token-refresh.sh ]; then
nohup /app/scripts/molecule-gh-token-refresh.sh > /dev/null 2>&1 &
nohup bash -c '
while true; do
/app/scripts/molecule-gh-token-refresh.sh
rc=$?
echo "[molecule-gh-token-refresh] daemon exited rc=$rc — respawning in 30s" >&2
sleep 30
done
' > /home/agent/.gh-token-refresh.log 2>&1 &
fi
# --- Initial gh auth setup ---

View File

@ -2,53 +2,56 @@
# molecule-gh-token-refresh.sh — background daemon that keeps GitHub
# credentials fresh inside Molecule AI workspace containers.
#
# Runs as a background process started by entrypoint.sh. Every
# REFRESH_INTERVAL_SEC (default 45 min = 2700s) it calls the credential
# helper's _refresh_gh action which:
# 1. Fetches a fresh installation token from the platform API
# 2. Updates the local cache (used by git credential helper)
# 3. Runs `gh auth login --with-token` so `gh` CLI stays authenticated
# 4. Writes ~/.gh_token for any scripts that read it
# Started by entrypoint.sh under a respawn wrapper. Every
# REFRESH_INTERVAL_SEC + jitter (default 45 min ± 2 min) it calls the
# credential helper's _refresh_gh action.
#
# The daemon logs to stderr (captured by Docker) and is designed to be
# fire-and-forget — if a single refresh fails, it logs the error and
# retries on the next interval. The credential helper itself has a
# fallback chain (cache > API > env var) so a missed refresh is not
# immediately fatal.
# # Jitter
# A 0..120s random offset prevents 39 containers from synchronizing
# their refresh requests against /workspaces/:id/github-installation-token.
#
# Usage (from entrypoint.sh):
# nohup /app/scripts/molecule-gh-token-refresh.sh &
# # Security
# - This daemon NEVER prints token values. Failures log the helper's
# exit code only, not its stderr, so token bytes can't leak via the
# docker log pipeline.
# - The helper script is responsible for chmod 600 on cache files.
#
set -uo pipefail
HELPER_SCRIPT="/app/scripts/molecule-git-token-helper.sh"
HELPER_SCRIPT="${TOKEN_HELPER_SCRIPT:-/app/scripts/molecule-git-token-helper.sh}"
REFRESH_INTERVAL_SEC="${TOKEN_REFRESH_INTERVAL_SEC:-2700}" # 45 min
JITTER_MAX_SEC="${TOKEN_REFRESH_JITTER_SEC:-120}"
INITIAL_DELAY_SEC="${TOKEN_REFRESH_INITIAL_DELAY_SEC:-60}"
log() {
echo "[molecule-gh-token-refresh] $(date -u '+%Y-%m-%dT%H:%M:%SZ') $*" >&2
}
# Wait a short time before the first refresh to let the container finish
# booting and .auth_token to be written by the runtime's register call.
INITIAL_DELAY_SEC="${TOKEN_REFRESH_INITIAL_DELAY_SEC:-60}"
log "starting (interval=${REFRESH_INTERVAL_SEC}s, initial_delay=${INITIAL_DELAY_SEC}s)"
jittered_sleep() {
local base="$1"
local jitter=$((RANDOM % (JITTER_MAX_SEC + 1)))
sleep $((base + jitter))
}
log "starting (interval=${REFRESH_INTERVAL_SEC}s ± ${JITTER_MAX_SEC}s, initial_delay=${INITIAL_DELAY_SEC}s)"
sleep "${INITIAL_DELAY_SEC}"
# Initial refresh — prime the cache + gh auth immediately after boot.
# Discard helper output to /dev/null so token can't leak via docker logs.
log "initial token refresh"
if bash "${HELPER_SCRIPT}" _refresh_gh 2>&1; then
if bash "${HELPER_SCRIPT}" _refresh_gh >/dev/null 2>&1; then
log "initial refresh succeeded"
else
log "initial refresh failed (will retry in ${REFRESH_INTERVAL_SEC}s)"
log "initial refresh failed (rc=$?) — will retry in ~${REFRESH_INTERVAL_SEC}s"
fi
# Steady-state loop.
while true; do
sleep "${REFRESH_INTERVAL_SEC}"
jittered_sleep "${REFRESH_INTERVAL_SEC}"
log "periodic token refresh"
if bash "${HELPER_SCRIPT}" _refresh_gh 2>&1; then
if bash "${HELPER_SCRIPT}" _refresh_gh >/dev/null 2>&1; then
log "refresh succeeded"
else
log "refresh failed (will retry in ${REFRESH_INTERVAL_SEC}s)"
log "refresh failed (rc=$?) — will retry in ~${REFRESH_INTERVAL_SEC}s"
fi
done

View File

@ -138,19 +138,32 @@ _fetch_token_from_api() {
return 1
fi
# NOTE: capture stderr to a tmp file (NOT $response) so the response
# body — which contains the token on success — never lands in error
# log lines via $response interpolation.
local _err_file
_err_file=$(mktemp)
response=$(curl -sf \
-H "Authorization: Bearer ${bearer}" \
-H "Accept: application/json" \
--max-time 10 \
"${ENDPOINT}" 2>&1) || {
echo "[molecule-git-token-helper] platform request failed: ${response}" >&2
"${ENDPOINT}" 2>"${_err_file}") || {
local _curl_rc=$?
local _err_msg
_err_msg=$(cat "${_err_file}")
rm -f "${_err_file}"
echo "[molecule-git-token-helper] platform request failed (curl rc=${_curl_rc}): ${_err_msg}" >&2
return 1
}
rm -f "${_err_file}"
# Parse {"token":"ghs_...","expires_at":"..."} with sed (no jq dependency).
token=$(echo "${response}" | sed -n 's/.*"token":"\([^"]*\)".*/\1/p')
if [ -z "${token}" ]; then
echo "[molecule-git-token-helper] empty token in platform response: ${response}" >&2
# SECURITY: the response body MAY contain a token under a different
# JSON key name. Never include $response in this error message —
# log only the size as a coarse debugging signal.
echo "[molecule-git-token-helper] empty token in platform response (body=${#response} bytes)" >&2
return 1
fi