forked from molecule-ai/molecule-core
Merge branch 'staging' into fix/main-orgtoken-mocks
This commit is contained in:
commit
0466dc5f7e
41
.coverage-allowlist.txt
Normal file
41
.coverage-allowlist.txt
Normal file
@ -0,0 +1,41 @@
|
||||
# Coverage allowlist — security-critical files that are currently below
|
||||
# the 10% per-file floor and are being tracked for remediation.
|
||||
#
|
||||
# Format: one path per line, relative to workspace-server/.
|
||||
# Lines starting with # and blank lines are ignored.
|
||||
#
|
||||
# Process:
|
||||
# - A path in this list is WARNED on each CI run, not failed.
|
||||
# - Each entry must reference a tracking issue and expiry date.
|
||||
# - On expiry, either the coverage is fixed OR the path graduates to
|
||||
# hard-fail (revert the allowlist entry).
|
||||
#
|
||||
# See #1823 for the gate design and ratchet plan.
|
||||
|
||||
# ============== Active exceptions ==============
|
||||
|
||||
# Filed 2026-04-23 — expiry 2026-05-23 (30 days). Tracking: #1823.
|
||||
# These are the files flagged by the first run of the critical-path gate.
|
||||
# QA team + platform team share ownership of test coverage remediation.
|
||||
|
||||
internal/handlers/a2a_proxy.go
|
||||
internal/handlers/a2a_proxy_helpers.go
|
||||
internal/handlers/registry.go
|
||||
internal/handlers/secrets.go
|
||||
internal/handlers/tokens.go
|
||||
internal/handlers/workspace_provision.go
|
||||
internal/middleware/wsauth_middleware.go
|
||||
|
||||
# The following paths matched via looser CRITICAL_PATH substrings
|
||||
# (e.g. "registry" matched both internal/registry/ and internal/channels/registry.go).
|
||||
# Adding them here so the gate can land without blocking staging merges;
|
||||
# a follow-up PR will tighten CRITICAL_PATHS to exact prefixes so these
|
||||
# graduate to hard-fail precisely where security-critical.
|
||||
|
||||
internal/channels/registry.go
|
||||
internal/crypto/aes.go
|
||||
internal/registry/access.go
|
||||
internal/registry/healthsweep.go
|
||||
internal/registry/hibernation.go
|
||||
internal/registry/provisiontimeout.go
|
||||
internal/wsauth/tokens.go
|
||||
97
.github/workflows/ci.yml
vendored
97
.github/workflows/ci.yml
vendored
@ -81,15 +81,98 @@ jobs:
|
||||
continue-on-error: true # Warn but don't block until codebase is clean
|
||||
- name: Run tests with race detection and coverage
|
||||
run: go test -race -coverprofile=coverage.out ./...
|
||||
- name: Check coverage baseline
|
||||
|
||||
- name: Per-file coverage report
|
||||
# Advisory — lists every source file with its coverage so reviewers
|
||||
# can see at-a-glance where gaps are. Sorted ascending so the worst
|
||||
# offenders float to the top. Does NOT fail the build; the hard
|
||||
# gate is the threshold check below. (#1823)
|
||||
run: |
|
||||
COVERAGE=$(go tool cover -func=coverage.out | grep total | awk '{print $3}' | sed 's/%//')
|
||||
echo "Total coverage: ${COVERAGE}%"
|
||||
THRESHOLD=25
|
||||
awk "BEGIN{if ($COVERAGE < $THRESHOLD) exit 1}" || {
|
||||
echo "::error::Coverage ${COVERAGE}% is below the ${THRESHOLD}% threshold"
|
||||
echo "=== Per-file coverage (worst first) ==="
|
||||
go tool cover -func=coverage.out \
|
||||
| grep -v '^total:' \
|
||||
| awk '{file=$1; sub(/:[0-9][0-9.]*:.*/, "", file); pct=$NF; gsub(/%/,"",pct); s[file]+=pct; c[file]++}
|
||||
END {for (f in s) printf "%6.1f%% %s\n", s[f]/c[f], f}' \
|
||||
| sort -n
|
||||
|
||||
- name: Check coverage thresholds
|
||||
# Enforces two gates from #1823 Layer 1:
|
||||
# 1. Total floor (25% — ratchet plan in COVERAGE_FLOOR.md).
|
||||
# 2. Per-file floor — non-test .go files in security-critical
|
||||
# paths with coverage <10% fail the build, UNLESS the file
|
||||
# path is listed in .coverage-allowlist.txt (acknowledged
|
||||
# historical debt with a tracking issue + expiry).
|
||||
run: |
|
||||
set -e
|
||||
TOTAL_FLOOR=25
|
||||
# Security-critical paths where a 0%-coverage file is a real risk.
|
||||
CRITICAL_PATHS=(
|
||||
"internal/handlers/tokens"
|
||||
"internal/handlers/workspace_provision"
|
||||
"internal/handlers/a2a_proxy"
|
||||
"internal/handlers/registry"
|
||||
"internal/handlers/secrets"
|
||||
"internal/middleware/wsauth"
|
||||
"internal/crypto"
|
||||
)
|
||||
|
||||
TOTAL=$(go tool cover -func=coverage.out | grep '^total:' | awk '{print $3}' | sed 's/%//')
|
||||
echo "Total coverage: ${TOTAL}%"
|
||||
if awk "BEGIN{exit !($TOTAL < $TOTAL_FLOOR)}"; then
|
||||
echo "::error::Total coverage ${TOTAL}% is below the ${TOTAL_FLOOR}% floor. See COVERAGE_FLOOR.md for ratchet plan."
|
||||
exit 1
|
||||
}
|
||||
fi
|
||||
|
||||
# Aggregate per-file coverage → /tmp/perfile.txt: "<fullpath> <pct>"
|
||||
go tool cover -func=coverage.out \
|
||||
| grep -v '^total:' \
|
||||
| awk '{file=$1; sub(/:[0-9][0-9.]*:.*/, "", file); pct=$NF; gsub(/%/,"",pct); s[file]+=pct; c[file]++}
|
||||
END {for (f in s) printf "%s %.1f\n", f, s[f]/c[f]}' \
|
||||
> /tmp/perfile.txt
|
||||
|
||||
# Build allowlist — paths relative to workspace-server, one per line.
|
||||
# Lines starting with # are comments.
|
||||
ALLOWLIST=""
|
||||
if [ -f ../.coverage-allowlist.txt ]; then
|
||||
ALLOWLIST=$(grep -vE '^(#|[[:space:]]*$)' ../.coverage-allowlist.txt || true)
|
||||
fi
|
||||
|
||||
FAILED=0
|
||||
WARNED=0
|
||||
for path in "${CRITICAL_PATHS[@]}"; do
|
||||
while read -r file pct; do
|
||||
[[ "$file" == *_test.go ]] && continue
|
||||
[[ "$file" == *"$path"* ]] || continue
|
||||
awk "BEGIN{exit !($pct < 10)}" || continue
|
||||
|
||||
# Strip the package-import prefix so we can match .coverage-allowlist.txt
|
||||
# entries written as paths relative to workspace-server/.
|
||||
rel=$(echo "$file" | sed 's|^github.com/Molecule-AI/molecule-monorepo/platform/||')
|
||||
|
||||
if echo "$ALLOWLIST" | grep -qxF "$rel"; then
|
||||
echo "::warning file=workspace-server/$rel::Critical file at ${pct}% coverage (allowlisted, #1823) — fix before expiry."
|
||||
WARNED=$((WARNED+1))
|
||||
else
|
||||
echo "::error file=workspace-server/$rel::Critical file at ${pct}% coverage — must be >=10% (target 80%). See #1823. To acknowledge as known debt, add this path to .coverage-allowlist.txt."
|
||||
FAILED=$((FAILED+1))
|
||||
fi
|
||||
done < /tmp/perfile.txt
|
||||
done
|
||||
|
||||
echo ""
|
||||
echo "Critical-path check: $FAILED new failures, $WARNED allowlisted warnings."
|
||||
|
||||
if [ "$FAILED" -gt 0 ]; then
|
||||
echo ""
|
||||
echo "$FAILED security-critical file(s) have <10% test coverage and are"
|
||||
echo "NOT in the allowlist. These paths handle auth, tokens, secrets, or"
|
||||
echo "workspace provisioning — a 0% file here is the exact gap that let"
|
||||
echo "CWE-22, CWE-78, KI-005 slip through in past incidents. Either:"
|
||||
echo " (a) add tests to raise coverage above 10%, or"
|
||||
echo " (b) add the path to .coverage-allowlist.txt with an expiry date"
|
||||
echo " and a tracking issue reference."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
canvas-build:
|
||||
name: Canvas (Next.js)
|
||||
|
||||
78
COVERAGE_FLOOR.md
Normal file
78
COVERAGE_FLOOR.md
Normal file
@ -0,0 +1,78 @@
|
||||
# Coverage Floor
|
||||
|
||||
CI enforces three coverage gates on `workspace-server` (Go). All defined in
|
||||
`.github/workflows/ci.yml` → `platform-build` job.
|
||||
|
||||
## Current floors (2026-04-23)
|
||||
|
||||
| Gate | Threshold | What fails |
|
||||
|---|---|---|
|
||||
| **Total floor** | `25%` | `go tool cover -func` reports total below floor |
|
||||
| **Critical-path per-file floor** | `10%` | Any non-test source file in a security-critical path with coverage ≤10% |
|
||||
| **Per-file report** | advisory | Printed in CI log, sorted worst-first, does not fail |
|
||||
|
||||
Total floor starts at 25% (unchanged from pre-#1823 to keep this PR strictly
|
||||
additive). The new protection is the critical-path per-file floor, which
|
||||
directly closes the gap that prompted the issue. Ratchet plan below begins
|
||||
the month after to let the team first observe the gate in action.
|
||||
|
||||
## Security-critical paths (Gate 2)
|
||||
|
||||
Changes to these paths have historically introduced security issues (CWE-22,
|
||||
CWE-78, KI-005, SSRF) or billing/auth risk. Coverage must not drop to zero.
|
||||
|
||||
- `internal/handlers/tokens*`
|
||||
- `internal/handlers/workspace_provision*`
|
||||
- `internal/handlers/a2a_proxy*`
|
||||
- `internal/handlers/registry*`
|
||||
- `internal/handlers/secrets*`
|
||||
- `internal/middleware/wsauth*`
|
||||
- `internal/crypto*`
|
||||
|
||||
## Ratchet plan
|
||||
|
||||
Floor ratchets upward on a fixed cadence. Any ratchet is a PR — reviewable,
|
||||
reversible, and creates history. The table below is the intended schedule.
|
||||
|
||||
| Date | Total floor | Critical-path floor | Notes |
|
||||
|---|---|---|---|
|
||||
| 2026-04-23 | 25% | 10% | Initial gate (this file). |
|
||||
| 2026-05-23 | 30% | 20% | First ratchet |
|
||||
| 2026-06-23 | 40% | 30% | |
|
||||
| 2026-07-23 | 50% | 40% | |
|
||||
| 2026-08-23 | 55% | 50% | |
|
||||
| 2026-09-23 | 60% | 60% | |
|
||||
| 2026-10-23 | 70% | 70% | Target steady-state |
|
||||
|
||||
The target end-state matches the per-role QA prompts which specify
|
||||
"coverage >80% on changed files". CI enforces the floor; reviewers still
|
||||
enforce the per-PR bar.
|
||||
|
||||
## Exceptions
|
||||
|
||||
If a critical-path file genuinely cannot have coverage above the floor (e.g.
|
||||
thin wrapper around a third-party SDK with no branches to test), add an entry
|
||||
here with:
|
||||
|
||||
1. **File**: `internal/handlers/example.go`
|
||||
2. **Reason**: Why coverage can't hit the floor
|
||||
3. **Tracking issue**: GitHub issue for the real fix
|
||||
4. **Expiry**: 14 days from entry date; after expiry either coverage is fixed
|
||||
or the issue is closed as "accepted technical debt"
|
||||
|
||||
### Active exceptions
|
||||
|
||||
*(none — add here if you need to land code that legitimately can't clear the floor)*
|
||||
|
||||
## Why this gate exists
|
||||
|
||||
Issue #1823: an external audit found critical files at 0% coverage despite
|
||||
test files existing with hundreds of lines. The existing CI step measured
|
||||
coverage but didn't enforce a meaningful threshold. Any file could go from
|
||||
80% → 0% and CI stayed green, because the single gate (total ≥25%) ignored
|
||||
per-file distribution.
|
||||
|
||||
This gate makes "no untested critical paths merged" a mechanical property of
|
||||
the CI, not a behavioural property of QA agents or individual reviewers —
|
||||
which is the only way to make it survive fleet outages, agent rotations, or
|
||||
QA process changes.
|
||||
@ -451,16 +451,17 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{})
|
||||
}
|
||||
|
||||
// Auto-recovery: if a workspace is marked "failed" or "provisioning" but is
|
||||
// actively sending heartbeats, it has clearly booted successfully. Transition
|
||||
// to "online" so the scheduler and dashboard reflect reality. This catches
|
||||
// cases where the provisioner crashed mid-setup or an earlier error left the
|
||||
// status stale.
|
||||
if currentStatus == "failed" || currentStatus == "provisioning" {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'online', updated_at = now() WHERE id = $1 AND status IN ('failed', 'provisioning')`, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to auto-recover %s from %s to online: %v", payload.WorkspaceID, currentStatus, err)
|
||||
// Auto-recovery: if a workspace is marked "provisioning" but is actively sending
|
||||
// heartbeats, it has successfully started up. Transition to "online" so the scheduler
|
||||
// and A2A proxy can dispatch tasks to it. The provisioner does not call
|
||||
// /registry/register on container start — only the heartbeat loop does, so this
|
||||
// transition is the only mechanism that moves newly-started workspaces out of
|
||||
// the phantom-idle state. (#1784)
|
||||
if currentStatus == "provisioning" {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'online', updated_at = now() WHERE id = $1 AND status = 'provisioning'`, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to transition %s from provisioning to online: %v", payload.WorkspaceID, err)
|
||||
} else {
|
||||
log.Printf("Heartbeat: auto-recovered %s from %s to online (heartbeat received)", payload.WorkspaceID, currentStatus)
|
||||
log.Printf("Heartbeat: transitioned %s from provisioning to online (heartbeat received)", payload.WorkspaceID)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{
|
||||
"recovered_from": currentStatus,
|
||||
|
||||
@ -134,6 +134,56 @@ func TestHeartbeatHandler_OfflineToOnline(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Heartbeat — provisioning → online recovery (#1784) ====================
|
||||
|
||||
func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// Expect prevTask SELECT
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-provisioning").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
|
||||
// Expect heartbeat UPDATE
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-provisioning", 0.0, "", 1, 3000, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect evaluateStatus SELECT — currently provisioning
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-provisioning").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("provisioning"))
|
||||
|
||||
// Expect status transition to online (#1784)
|
||||
mock.ExpectExec("UPDATE workspaces SET status = 'online'").
|
||||
WithArgs("ws-provisioning").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect RecordAndBroadcast INSERT for WORKSPACE_ONLINE
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
body := `{"workspace_id":"ws-provisioning","error_rate":0.0,"sample_error":"","active_tasks":1,"uptime_seconds":3000}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeatHandler_BadJSON(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@ -95,9 +95,18 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
payload.Tier = 1
|
||||
}
|
||||
|
||||
// Detect runtime from template config.yaml if not specified in request.
|
||||
// Must happen before DB insert so the correct runtime is persisted.
|
||||
if payload.Runtime == "" && payload.Template != "" {
|
||||
// Detect runtime + default model from template config.yaml when the
|
||||
// caller omitted them. Must happen before DB insert so persisted
|
||||
// fields match the template's intent.
|
||||
//
|
||||
// Model default pre-fills the hermes-trap gap (PR #1714 + TemplatePalette
|
||||
// patch): any create path (canvas dialog, TemplatePalette, direct API)
|
||||
// that names a template but forgets a model slug now inherits the
|
||||
// template's `runtime_config.model` — without it, hermes-agent falls
|
||||
// back to its compiled-in Anthropic default and 401s when the user's
|
||||
// key is for a different provider. Non-hermes runtimes are unaffected
|
||||
// (the server still passes model through, they just don't use it).
|
||||
if payload.Template != "" && (payload.Runtime == "" || payload.Model == "") {
|
||||
// #226: payload.Template is attacker-controllable. resolveInsideRoot
|
||||
// rejects absolute paths and any ".." that escapes configsDir so the
|
||||
// provisioner can't be pointed at host directories.
|
||||
@ -111,10 +120,32 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
if readErr != nil {
|
||||
log.Printf("Create: could not read config.yaml for template %q: %v", payload.Template, readErr)
|
||||
}
|
||||
for _, line := range strings.Split(string(cfgData), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if strings.HasPrefix(line, "runtime:") {
|
||||
payload.Runtime = strings.TrimSpace(strings.TrimPrefix(line, "runtime:"))
|
||||
// Two-pass line scanner: the old parser found top-level `runtime:`
|
||||
// by substring match on trimmed lines. We extend it to also find
|
||||
// the nested `runtime_config.model:` (new format) and top-level
|
||||
// `model:` (legacy format). A minimal state var tracks whether
|
||||
// we're inside the runtime_config block based on indentation.
|
||||
inRuntimeConfig := false
|
||||
for _, rawLine := range strings.Split(string(cfgData), "\n") {
|
||||
// Track indentation to detect block transitions.
|
||||
trimmed := strings.TrimLeft(rawLine, " \t")
|
||||
indented := len(rawLine) > len(trimmed)
|
||||
if !indented {
|
||||
// Left the runtime_config block (or never entered it).
|
||||
inRuntimeConfig = strings.HasPrefix(trimmed, "runtime_config:")
|
||||
}
|
||||
stripped := strings.TrimSpace(rawLine)
|
||||
switch {
|
||||
case payload.Runtime == "" && !indented && strings.HasPrefix(stripped, "runtime:") && !strings.HasPrefix(stripped, "runtime_config"):
|
||||
payload.Runtime = strings.TrimSpace(strings.TrimPrefix(stripped, "runtime:"))
|
||||
case payload.Model == "" && !indented && strings.HasPrefix(stripped, "model:"):
|
||||
// Legacy top-level `model:` — pre-runtime_config templates.
|
||||
payload.Model = strings.Trim(strings.TrimSpace(strings.TrimPrefix(stripped, "model:")), `"'`)
|
||||
case payload.Model == "" && indented && inRuntimeConfig && strings.HasPrefix(stripped, "model:"):
|
||||
// Nested `runtime_config.model:` — current format (hermes etc.).
|
||||
payload.Model = strings.Trim(strings.TrimSpace(strings.TrimPrefix(stripped, "model:")), `"'`)
|
||||
}
|
||||
if payload.Runtime != "" && payload.Model != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,8 @@ import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
@ -1215,3 +1217,169 @@ func TestWorkspaceUpdate_BudgetLimitOnly_Ignored(t *testing.T) {
|
||||
t.Errorf("unexpected DB call for budget_limit: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_TemplateDefaultsMissingRuntimeAndModel covers the
|
||||
// hermes-trap case: a caller (TemplatePalette, direct API, script) POSTs
|
||||
// /workspaces with only a template name + no runtime + no model. The
|
||||
// handler must read the template's config.yaml and fill in both fields
|
||||
// BEFORE DB insert — otherwise hermes-agent auto-detects provider
|
||||
// wrong and 401s downstream (PR #1714 context).
|
||||
//
|
||||
// Uses the nested runtime_config.model format current templates use;
|
||||
// legacy top-level `model:` is covered by the Legacy test below.
|
||||
func TestWorkspaceCreate_TemplateDefaultsMissingRuntimeAndModel(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
|
||||
// Stage a hermes-like template inside the configsDir the handler reads.
|
||||
configsDir := t.TempDir()
|
||||
templateDir := filepath.Join(configsDir, "hermes-template")
|
||||
if err := os.MkdirAll(templateDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir: %v", err)
|
||||
}
|
||||
cfg := []byte(`name: Hermes Agent
|
||||
tier: 2
|
||||
runtime: hermes
|
||||
runtime_config:
|
||||
model: nousresearch/hermes-4-70b
|
||||
`)
|
||||
if err := os.WriteFile(filepath.Join(templateDir, "config.yaml"), cfg, 0o644); err != nil {
|
||||
t.Fatalf("write cfg: %v", err)
|
||||
}
|
||||
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", configsDir)
|
||||
|
||||
mock.ExpectBegin()
|
||||
// Request omits runtime + model; handler must fill from the template
|
||||
// and hand the completed values to the INSERT.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Hermes Agent", nil, 1, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
WithArgs(sqlmock.AnyArg(), float64(0), float64(0)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"name":"Hermes Agent","template":"hermes-template"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_TemplateDefaultsLegacyTopLevelModel covers
|
||||
// pre-runtime_config templates that declare `model:` at the top level.
|
||||
// These should still surface the default via the same auto-fill.
|
||||
func TestWorkspaceCreate_TemplateDefaultsLegacyTopLevelModel(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
|
||||
configsDir := t.TempDir()
|
||||
templateDir := filepath.Join(configsDir, "legacy-template")
|
||||
if err := os.MkdirAll(templateDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir: %v", err)
|
||||
}
|
||||
cfg := []byte(`name: Legacy Agent
|
||||
tier: 1
|
||||
runtime: langgraph
|
||||
model: anthropic:claude-sonnet-4-5
|
||||
`)
|
||||
if err := os.WriteFile(filepath.Join(templateDir, "config.yaml"), cfg, 0o644); err != nil {
|
||||
t.Fatalf("write cfg: %v", err)
|
||||
}
|
||||
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", configsDir)
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Legacy Agent", nil, 1, "langgraph",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
WithArgs(sqlmock.AnyArg(), float64(0), float64(0)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"name":"Legacy Agent","template":"legacy-template"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_CallerModelOverridesTemplateDefault asserts that
|
||||
// when the caller passes an explicit `model`, we DO NOT overwrite it
|
||||
// with the template's default. The pre-fill only happens on empty.
|
||||
func TestWorkspaceCreate_CallerModelOverridesTemplateDefault(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
|
||||
configsDir := t.TempDir()
|
||||
templateDir := filepath.Join(configsDir, "hermes-template")
|
||||
if err := os.MkdirAll(templateDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir: %v", err)
|
||||
}
|
||||
cfg := []byte(`runtime: hermes
|
||||
runtime_config:
|
||||
model: nousresearch/hermes-4-70b
|
||||
`)
|
||||
if err := os.WriteFile(filepath.Join(templateDir, "config.yaml"), cfg, 0o644); err != nil {
|
||||
t.Fatalf("write cfg: %v", err)
|
||||
}
|
||||
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", configsDir)
|
||||
|
||||
mock.ExpectBegin()
|
||||
// Caller explicitly chose minimax — template's hermes-4-70b must NOT win.
|
||||
// The INSERT only passes runtime to the DB (model goes to agent_card /
|
||||
// downstream config); we verify runtime == "hermes" and rely on the
|
||||
// absence of a handler error to mean the model passthrough was honored.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Custom Hermes", nil, 1, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
WithArgs(sqlmock.AnyArg(), float64(0), float64(0)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"name":"Custom Hermes","template":"hermes-template","model":"minimax/MiniMax-M2.7"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package provisioner
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -10,6 +11,8 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// CPProvisioner provisions workspace agents by calling the control plane's
|
||||
@ -182,8 +185,26 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
|
||||
}
|
||||
|
||||
// Stop terminates the workspace's EC2 instance via the control plane.
|
||||
//
|
||||
// Looks up the actual EC2 instance_id from the workspaces table before
|
||||
// calling CP — earlier versions passed workspaceID (a UUID) as the
|
||||
// instance_id query param, which CP forwarded to EC2 TerminateInstances,
|
||||
// which rejected with InvalidInstanceID.Malformed (EC2 IDs are i-… not
|
||||
// UUIDs). The terminate failure then left the workspace's SG attached,
|
||||
// blocking the next provision with InvalidGroup.Duplicate — a full
|
||||
// "Save & Restart" crash on SaaS.
|
||||
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID)
|
||||
instanceID, err := resolveInstanceID(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cp provisioner: stop: resolve instance_id: %w", err)
|
||||
}
|
||||
if instanceID == "" {
|
||||
// No instance was ever provisioned (or already deprovisioned and
|
||||
// the column was cleared). Nothing to terminate — idempotent.
|
||||
log.Printf("CP provisioner: Stop for %s — no instance_id on file, nothing to do", workspaceID)
|
||||
return nil
|
||||
}
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, instanceID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
|
||||
p.provisionAuthHeaders(req)
|
||||
resp, err := p.httpClient.Do(req)
|
||||
@ -194,6 +215,35 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolveInstanceID reads workspaces.instance_id for the given workspace.
|
||||
// Returns ("", nil) when the row exists but has no instance_id recorded
|
||||
// (edge case for external workspaces or stale rows). Returns an error
|
||||
// only on real DB failures, not on missing rows — callers (Stop,
|
||||
// IsRunning) treat the empty string as "nothing to act on."
|
||||
//
|
||||
// Exposed as a package var so tests can substitute a stub without
|
||||
// standing up a sqlmock just to unblock the Stop/IsRunning code path.
|
||||
// Production code never reassigns it.
|
||||
var resolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
|
||||
if db.DB == nil {
|
||||
// Defensive: NewCPProvisioner never runs without db.DB being
|
||||
// set in main(). If somehow nil, treat as "no instance" rather
|
||||
// than panicking in the Stop/IsRunning path.
|
||||
return "", nil
|
||||
}
|
||||
var instanceID sql.NullString
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&instanceID)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return "", err
|
||||
}
|
||||
if !instanceID.Valid {
|
||||
return "", nil
|
||||
}
|
||||
return instanceID.String, nil
|
||||
}
|
||||
|
||||
// IsRunning checks workspace EC2 instance state via the control plane.
|
||||
//
|
||||
// Contract (matches the Docker Provisioner.IsRunning contract —
|
||||
@ -219,7 +269,18 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
|
||||
// Both callers are happy with (true, err); callers that need the
|
||||
// previous (false, err) shape must inspect err themselves.
|
||||
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID)
|
||||
instanceID, err := resolveInstanceID(ctx, workspaceID)
|
||||
if err != nil {
|
||||
// Treat DB errors the same as transport errors — (true, err) keeps
|
||||
// a2a_proxy on the alive path and logs the signal.
|
||||
return true, fmt.Errorf("cp provisioner: status: resolve instance_id: %w", err)
|
||||
}
|
||||
if instanceID == "" {
|
||||
// No instance recorded. Report "not running" cleanly (no error)
|
||||
// so restart cascades can trigger a fresh provision.
|
||||
return false, nil
|
||||
}
|
||||
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, instanceID)
|
||||
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
p.provisionAuthHeaders(req)
|
||||
resp, err := p.httpClient.Do(req)
|
||||
|
||||
@ -11,6 +11,23 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// primeInstanceIDLookup swaps resolveInstanceID for a stub that returns
|
||||
// the mapped instance_id for the given workspace_id, or "" for anything
|
||||
// not in the map. Cheaper than standing up a sqlmock since Stop/IsRunning
|
||||
// tests mostly don't care about the SQL path — they're testing the CP
|
||||
// HTTP interaction downstream of the lookup.
|
||||
func primeInstanceIDLookup(t *testing.T, pairs map[string]string) {
|
||||
t.Helper()
|
||||
prev := resolveInstanceID
|
||||
resolveInstanceID = func(_ context.Context, wsID string) (string, error) {
|
||||
if id, ok := pairs[wsID]; ok {
|
||||
return id, nil
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
t.Cleanup(func() { resolveInstanceID = prev })
|
||||
}
|
||||
|
||||
// TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't
|
||||
// have a MOLECULE_ORG_ID, and the provisioner must refuse to construct
|
||||
// rather than silently phone home to the prod CP with an empty tenant.
|
||||
@ -267,6 +284,12 @@ func TestStart_TransportFailureSurfaces(t *testing.T) {
|
||||
// platform-wide shared secret AND the per-tenant admin token, or the
|
||||
// CP will 401.
|
||||
func TestStop_SendsBothAuthHeaders(t *testing.T) {
|
||||
// resolveInstanceID looks up the real EC2 id from the workspaces
|
||||
// table; previously this test passed when the tenant buggily
|
||||
// reused workspaceID AS instance_id. Now we assert the correct
|
||||
// EC2 id round-trips.
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"})
|
||||
|
||||
var sawBearer, sawAdminToken, sawMethod, sawPath string
|
||||
var sawInstance string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -295,8 +318,8 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) {
|
||||
if sawPath != "/cp/workspaces/ws-1" {
|
||||
t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath)
|
||||
}
|
||||
if sawInstance != "ws-1" {
|
||||
t.Errorf("instance_id query = %q, want ws-1", sawInstance)
|
||||
if sawInstance != "i-abc123" {
|
||||
t.Errorf("instance_id query = %q, want i-abc123 (from DB lookup, NOT the workspace UUID)", sawInstance)
|
||||
}
|
||||
if sawBearer != "Bearer s3cret" {
|
||||
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
|
||||
@ -310,6 +333,7 @@ func TestStop_SendsBothAuthHeaders(t *testing.T) {
|
||||
// teardown call hits a dead CP, the error must surface so the caller
|
||||
// knows the workspace might still be running and needs retry.
|
||||
func TestStop_TransportErrorSurfaces(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-abc123"})
|
||||
p := &CPProvisioner{
|
||||
baseURL: "http://127.0.0.1:1",
|
||||
orgID: "org-1",
|
||||
@ -327,6 +351,7 @@ func TestStop_TransportErrorSurfaces(t *testing.T) {
|
||||
// TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose
|
||||
// a bool ("running"/"pending"/"terminated" → true only for "running").
|
||||
func TestIsRunning_ParsesStateField(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-parsesstatefield"})
|
||||
cases := map[string]bool{
|
||||
"running": true,
|
||||
"pending": false,
|
||||
@ -364,6 +389,7 @@ func TestIsRunning_ParsesStateField(t *testing.T) {
|
||||
// require the same per-tenant auth because they leak public_ip +
|
||||
// private_ip to the caller.
|
||||
func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-auth-headers"})
|
||||
var sawBearer, sawAdminToken string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
sawBearer = r.Header.Get("Authorization")
|
||||
@ -398,6 +424,7 @@ func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
|
||||
// The sweeper (healthsweep.go) inspects err independently and skips
|
||||
// on any error, so (true, err) is equally safe for that caller.
|
||||
func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-transport"})
|
||||
p := &CPProvisioner{
|
||||
baseURL: "http://127.0.0.1:1",
|
||||
orgID: "org-1",
|
||||
@ -418,6 +445,7 @@ func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
|
||||
// the sweeper would see the workspace as not-running. Now every
|
||||
// non-2xx is an error the caller can log + retry.
|
||||
func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-non2xx"})
|
||||
cases := []struct {
|
||||
name string
|
||||
status int
|
||||
@ -459,6 +487,7 @@ func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
|
||||
// a middleware glitch (HTML error page with 200) from looking like
|
||||
// "workspace stopped".
|
||||
func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-malformed"})
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
_, _ = io.WriteString(w, "<html>maintenance mode</html>")
|
||||
@ -486,6 +515,7 @@ func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
|
||||
// the documented contract values rather than simulating the whole
|
||||
// a2a_proxy flow.
|
||||
func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-contract"})
|
||||
// Simulate every error path and assert running==true for each.
|
||||
t.Run("transport error", func(t *testing.T) {
|
||||
p := &CPProvisioner{
|
||||
@ -559,6 +589,7 @@ func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
|
||||
// only needs the prefix to produce a value, so the decode succeeds —
|
||||
// and the LimitReader enforces the cap regardless.
|
||||
func TestIsRunning_BoundedBodyRead(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-bounded"})
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
// Valid JSON prefix, then pad well past the 64 KiB cap.
|
||||
@ -628,3 +659,67 @@ func TestGetConsoleOutput_UsesAdminBearer(t *testing.T) {
|
||||
t.Errorf("bearer = %q, want Bearer admin-api-key (NOT the provision secret)", sawBearer)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStop_EmptyInstanceIDIsNoop — when workspaces.instance_id is NULL
|
||||
// (e.g. a row that was never provisioned, or deprovisioned and cleared),
|
||||
// Stop should be a no-op instead of sending a malformed CP request.
|
||||
// Regression guard: previously Stop sent workspaceID as instance_id
|
||||
// even when no EC2 had been booked, causing CP → EC2 to 400.
|
||||
func TestStop_EmptyInstanceIDIsNoop(t *testing.T) {
|
||||
// Empty map → lookup returns ("", nil) for any workspace.
|
||||
primeInstanceIDLookup(t, map[string]string{})
|
||||
|
||||
hit := false
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hit = true
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
if err := p.Stop(context.Background(), "ws-ghost"); err != nil {
|
||||
t.Fatalf("Stop with empty instance_id should no-op, got err %v", err)
|
||||
}
|
||||
if hit {
|
||||
t.Errorf("Stop contacted CP even though instance_id was empty")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsRunning_UsesDBInstanceID — IsRunning must also look up
|
||||
// instance_id from the workspaces table, same as Stop. Mirror of
|
||||
// TestStop_SendsBothAuthHeaders.
|
||||
func TestIsRunning_UsesDBInstanceID(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-xyz789"})
|
||||
|
||||
var sawInstance string
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
sawInstance = r.URL.Query().Get("instance_id")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = io.WriteString(w, `{"state":"running"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
if _, err := p.IsRunning(context.Background(), "ws-1"); err != nil {
|
||||
t.Fatalf("IsRunning: %v", err)
|
||||
}
|
||||
if sawInstance != "i-xyz789" {
|
||||
t.Errorf("instance_id query = %q, want i-xyz789 (from DB lookup)", sawInstance)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsRunning_EmptyInstanceIDReturnsFalse — IsRunning on a
|
||||
// workspace with no recorded EC2 instance must report (false, nil) so
|
||||
// restart cascades re-provision fresh instead of looping on a stale
|
||||
// row with no backing instance.
|
||||
func TestIsRunning_EmptyInstanceIDReturnsFalse(t *testing.T) {
|
||||
primeInstanceIDLookup(t, map[string]string{})
|
||||
p := &CPProvisioner{baseURL: "http://unused", orgID: "org-1"}
|
||||
running, err := p.IsRunning(context.Background(), "ws-ghost")
|
||||
if err != nil {
|
||||
t.Errorf("IsRunning with empty instance_id should return (false, nil), got err %v", err)
|
||||
}
|
||||
if running {
|
||||
t.Errorf("IsRunning with empty instance_id should return running=false, got true")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user