Compare commits

..

1 Commits

Author SHA1 Message Date
devops-engineer 44ab45720f ci(gate): close all-required name-vs-coverage hole via cross-workflow drift check (F4)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
CI / Python Lint & Test (pull_request) Successful in 3s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 7s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
E2E Chat / detect-changes (pull_request) Successful in 7s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 4s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 3s
E2E Chat / E2E Chat (pull_request) Successful in 3s
CI / Detect changes (pull_request) Successful in 14s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 11s
CI / Canvas (Next.js) (pull_request) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 8s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 11s
CI / Canvas Deploy Status (pull_request) Successful in 2s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 7s
CI / Platform (Go) (pull_request) Successful in 8s
sop-checklist / review-refire (pull_request_target) Has been skipped
gate-check-v3 / gate-check (pull_request_target) Successful in 7s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 12s
security-review / approved (pull_request_target) Failing after 5s
CI / all-required (pull_request) Successful in 10s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
qa-review / approved (pull_request_target) Failing after 26s
sop-checklist / all-items-acked (pull_request_target) Successful in 25s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 52s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m7s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m2s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 1m17s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m15s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m21s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m16s
Local Provision Lifecycle E2E / Local Provision Lifecycle E2E (stub) (pull_request) Failing after 4m26s
Local Provision Lifecycle E2E / Local Provision Lifecycle E2E (real image + MiniMax LLM, advisory) (pull_request) Failing after 7m35s
`CI / all-required (pull_request)` is a fail-closed aggregator over the CI
workflow's OWN jobs (plain `needs:` + explicit per-need `result == success`
assertion). That mechanism is correct. But its name implies it covers every
branch-protection required check, which it does NOT and CANNOT: Gitea Actions
has no cross-workflow `needs:`, so the sibling required workflows
`E2E API Smoke Test` (e2e-api.yml) and `Handlers Postgres Integration`
(handlers-postgres-integration.yml) emit their own status contexts that
`all-required` structurally cannot gate.

Observed latent hole (core PR #1086 @ 9136d05a): `CI / all-required` posted
success while `E2E API Smoke Test` (id 48) and `Handlers Postgres Integration`
(id 47) had already posted failure. Not currently exploitable — main BP
requires all three contexts independently — but if BP were ever trimmed to
just `CI / all-required` on the false assumption that it covers them, a
red-CI PR would look mergeable (the falsely-GREEN-aggregate class, inverse of
#2448's green-but-empty).

Root cause is NOT a bug in the aggregator (it is fail-closed for its scope);
it is the missing cross-workflow coverage guarantee. ci-required-drift.py's
F2 was meant to catch BP contexts with no emitter, but it is scoped to the
hard-coded lowercase literal `ci / ` prefix + bare job-KEYs — a shape that
does not match this repo (workflow is `name: CI`; CI jobs set their own
`name:`), so F2 is effectively dormant here.

Fix (robust, minimal, behavior-based):
- F4 in ci-required-drift.py: parse EVERY workflow's real `name:` + each
  job's `name|key` and assert every BP `status_check_contexts` entry has a
  live emitting workflow. This is the case-correct, repo-wide generalization
  of F2 and closes the inverse-of-F2 hole — a renamed/deleted sibling
  workflow that BP still requires now fails drift loudly instead of degrading
  to a silent absent-as-pending advisory gate. This is what keeps the
  `all-required` name honest at the repo level.
- Document the aggregator's SCOPE on the `all-required` job in ci.yml: it
  covers CI's own jobs only; E2E + Handlers MUST stay required in BP
  independently; do not trim BP to just `CI / all-required`.
- 7 new unit tests: emitter computation (job name>key, no-name, dir union,
  skip-unparseable) + detect_drift F4 silent-when-emitted, fires-on-stale-
  cross-workflow-context, and no-false-positive-on-lone-context.

All 23 ci-required-drift tests pass; 28 incl. ci-workflow-bookkeeping;
lint-workflow-yaml clean (59 files, 0 warnings). No existing behavior changed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 04:26:26 +00:00
6 changed files with 261 additions and 545 deletions
+104
View File
@@ -24,6 +24,17 @@ Three failure classes:
F3 (B) and (C) are not set-equal. Audit env wider than protection
→ audit flags non-force-merges as force; narrower → real
force-merges are missed.
F4 Context in (B) is emitted by NO workflow in .gitea/workflows/ at
all (repo-wide, case-correct generalization of F2, which only
covers `ci / `-prefixed names). This is the inverse-of-F2 hole and
the one that makes the `CI / all-required` aggregator's
name-vs-coverage gap safe: `all-required` is fail-closed over CI's
OWN jobs but CANNOT cover sibling required workflows
(`E2E API Smoke Test`, `Handlers Postgres Integration` — Gitea has
no cross-workflow `needs:`). F4 verifies each cross-workflow
required context still has a live emitter, so a renamed/deleted
sibling workflow that BP still requires is caught instead of
degrading to a silent absent-as-pending advisory gate.
Idempotency:
Searches OPEN issues by exact title prefix
@@ -380,6 +391,68 @@ def expected_context(job_key: str, workflow_name: str = "ci") -> str:
return f"{workflow_name} / {job_key} (pull_request)"
def workflow_emitted_contexts(wf_doc: dict) -> set[str]:
"""The set of `pull_request` status-check contexts a SINGLE workflow
emits, computed from its real `name:` + each job's `name or key`.
Gitea reports a context as `{workflow.name} / {job.name|job.key}
(pull_request)`. Unlike `expected_context()` (which hard-codes the
lowercase literal `ci` and the bare job-KEY — a shape that does NOT
match this repo, whose workflow is `name: CI` and whose CI jobs DO
set per-job `name:`), this reads the authoritative names straight
from the parsed YAML, so the contexts it produces are byte-equal to
what BP records. Used by F4 (cross-workflow emitter existence).
Jobs whose `if:` gates on `github.event_name`/`github.ref` are still
emitters on the events they DO run — they remain in the set; F4 only
asserts *existence of an emitter*, never that it ran on a given
trigger."""
name = wf_doc.get("name")
if not isinstance(name, str) or not name:
return set()
jobs = wf_doc.get("jobs")
if not isinstance(jobs, dict):
return set()
out: set[str] = set()
for key, spec in jobs.items():
job_name = key
if isinstance(spec, dict) and isinstance(spec.get("name"), str) and spec["name"]:
job_name = spec["name"]
out.add(f"{name} / {job_name} (pull_request)")
return out
def all_emitted_contexts(workflows_dir: str = ".gitea/workflows") -> set[str]:
"""Union of `pull_request` contexts emitted by EVERY workflow in the
repo. F4 uses this to assert that each BP-required
`status_check_contexts` entry corresponds to a real emitting
workflow+job — closing the inverse-of-F2 hole where BP requires a
context that NO workflow produces (e.g. a sibling workflow like
`E2E API Smoke Test` or `Handlers Postgres Integration` was renamed
or deleted while still required, leaving BP demanding a green it can
never receive; Gitea treats absent-as-pending → silent advisory
gate). This is what makes the misleadingly-named `CI / all-required`
aggregator safe at the repo level: it only covers CI's own jobs, but
F4 guarantees the cross-workflow required contexts it CANNOT cover
are real and present."""
import glob as _glob
emitted: set[str] = set()
for path in sorted(_glob.glob(os.path.join(workflows_dir, "*.yml"))):
try:
with open(path, encoding="utf-8") as f:
doc = yaml.safe_load(f)
except (OSError, yaml.YAMLError):
# A single unparseable sibling workflow must not blind F4 to
# the rest. Skip it loudly; lint-workflow-yaml gates parse
# validity separately.
sys.stderr.write(f"::warning::F4: could not parse {path}, skipping\n")
continue
if isinstance(doc, dict):
emitted |= workflow_emitted_contexts(doc)
return emitted
# --------------------------------------------------------------------------
# Drift detection
# --------------------------------------------------------------------------
@@ -531,6 +604,36 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
+ "\n".join(f" - {c}" for c in stale_protection)
)
# ----- F4: cross-workflow required context has no emitting workflow -----
# F2 (above) is scoped to `ci / `-prefixed contexts ONLY, and built
# from the hard-coded lowercase literal `ci` + bare job-keys — a shape
# that does NOT match this repo (workflow is `name: CI`, jobs set their
# own `name:`), so F2 is effectively dormant here. F4 is the
# case-correct, REPO-WIDE generalization: it parses every workflow's
# real `name:` + job `name|key` and asserts that EVERY BP-required
# context is actually emitted by some workflow.
#
# This is the gate that makes the `CI / all-required` aggregator's
# name-vs-coverage gap safe. `all-required` is fail-closed over CI's
# OWN jobs but — by Gitea's design (no cross-workflow `needs:`) — it
# CANNOT and does not cover sibling required workflows
# (`E2E API Smoke Test`, `Handlers Postgres Integration`). Those MUST
# be listed in BP independently. F4 verifies each such BP context
# still has a live emitter, so the inverse-of-F2 hole — BP requires a
# context that no workflow produces (rename/delete a sibling workflow
# while still required → Gitea treats absent-as-pending → silent
# advisory gate, and a red PR can look mergeable) — is caught.
repo_emitted = all_emitted_contexts(os.path.dirname(CI_WORKFLOW_PATH))
unemitted = sorted(c for c in contexts if c not in repo_emitted)
if unemitted:
findings.append(
"F4 — branch_protections/{br}.status_check_contexts entries that "
"NO workflow in .gitea/workflows/ emits "
"(stale required name → silent advisory gate; a red PR can look "
"mergeable):\n".format(br=branch)
+ "\n".join(f" - {c}" for c in unemitted)
)
# ----- F3: audit env vs protection contexts (set-equal) -----
only_in_env = sorted(env_set - contexts)
only_in_protection = sorted(contexts - env_set)
@@ -556,6 +659,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
"protection_contexts": sorted(contexts),
"audit_env_checks": sorted(env_set),
"expected_contexts": sorted(emitted_contexts),
"repo_emitted_contexts": sorted(repo_emitted),
}
return findings, debug
-6
View File
@@ -26,12 +26,6 @@ PROFILES: dict[str, dict[str, str]] = {
"handlers": (
r"^workspace-server/internal/handlers/"
r"|^workspace-server/internal/wsauth/"
# #2148: registry-auth real-PG integration tests (CanCommunicate
# parent_id hierarchy lives in internal/registry; org-admin token
# revoke/validate lives in internal/orgtoken) run in this same
# workflow, so a regression in either package MUST trigger the job.
r"|^workspace-server/internal/registry/"
r"|^workspace-server/internal/orgtoken/"
# #2149: the scheduler real-PG integration tests run in this same
# workflow (they reuse its migrated Postgres), so changes to the
# scheduler package must trigger the job too.
@@ -275,3 +275,125 @@ def test_detect_drift_no_f1_when_needs_empty_even_with_jobs():
findings, _ = drift.detect_drift("main")
assert not any("F1 —" in f for f in findings)
# ---------------------------------------------------------------------------
# F4 — cross-workflow required-context emitter existence
# (closes the `CI / all-required` name-vs-coverage hole: the sentinel is
# fail-closed over CI's own jobs but CANNOT cover sibling required
# workflows — Gitea has no cross-workflow `needs:` — so F4 guarantees each
# BP-required context still has a live emitting workflow.)
# ---------------------------------------------------------------------------
def test_workflow_emitted_contexts_uses_job_name_over_key():
"""Job `name:` wins over key; missing name falls back to key."""
doc = {
"name": "E2E API Smoke Test",
"jobs": {
"detect-changes": {}, # no name -> key
"e2e-api": {"name": "E2E API Smoke Test"},
},
}
got = drift.workflow_emitted_contexts(doc)
assert got == {
"E2E API Smoke Test / detect-changes (pull_request)",
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
}
def test_workflow_emitted_contexts_empty_when_no_name():
"""A workflow with no top-level `name:` emits nothing F4 can match."""
assert drift.workflow_emitted_contexts({"jobs": {"x": {}}}) == set()
def test_all_emitted_contexts_unions_workflow_dir(tmp_path):
"""all_emitted_contexts globs *.yml and unions their emitter sets."""
wf = tmp_path / "wf"
wf.mkdir()
(wf / "a.yml").write_text(
"name: CI\njobs:\n all-required:\n runs-on: x\n", encoding="utf-8"
)
(wf / "b.yml").write_text(
"name: Handlers Postgres Integration\n"
"jobs:\n integration:\n name: Handlers Postgres Integration\n"
" runs-on: x\n",
encoding="utf-8",
)
got = drift.all_emitted_contexts(str(wf))
assert "CI / all-required (pull_request)" in got
assert "Handlers Postgres Integration / Handlers Postgres Integration (pull_request)" in got
def test_all_emitted_contexts_skips_unparseable(tmp_path):
"""A single broken sibling workflow must not blind F4 to the rest."""
wf = tmp_path / "wf"
wf.mkdir()
(wf / "good.yml").write_text("name: CI\njobs:\n j:\n runs-on: x\n", encoding="utf-8")
(wf / "bad.yml").write_text("name: [unterminated\n : : :\n", encoding="utf-8")
got = drift.all_emitted_contexts(str(wf))
assert "CI / j (pull_request)" in got
# A BP fixture that includes the two cross-workflow required contexts.
_BP_WITH_SIBLINGS = {
"status_check_contexts": [
"CI / all-required (pull_request)",
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
"Handlers Postgres Integration / Handlers Postgres Integration (pull_request)",
]
}
# The matching set of repo-wide emitted contexts (what a correct repo produces).
_EMITTED_OK = {
"CI / all-required (pull_request)",
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
"Handlers Postgres Integration / Handlers Postgres Integration (pull_request)",
}
def test_detect_drift_f4_silent_when_all_contexts_emitted():
"""No F4 when every BP context has a live emitting workflow."""
ci = _make_ci_doc({"all-required": {}})
audit = _make_audit_doc(sorted(_BP_WITH_SIBLINGS["status_check_contexts"]))
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "api", return_value=(200, _BP_WITH_SIBLINGS)):
with patch.object(drift, "all_emitted_contexts", return_value=set(_EMITTED_OK)):
findings, debug = drift.detect_drift("main")
assert not any("F4 —" in f for f in findings)
assert debug["repo_emitted_contexts"] == sorted(_EMITTED_OK)
def test_detect_drift_f4_fires_on_stale_cross_workflow_context():
"""The core gate-hole regression: BP requires a cross-workflow context
(e.g. a renamed/deleted sibling workflow) that NO workflow emits.
F4 must fire — this is the inverse-of-F2 hole that makes a red PR look
mergeable if BP is ever trimmed/renamed around `CI / all-required`."""
ci = _make_ci_doc({"all-required": {}})
audit = _make_audit_doc(sorted(_BP_WITH_SIBLINGS["status_check_contexts"]))
# Handlers workflow got renamed -> its OLD BP context now has no emitter.
emitted_after_rename = {
"CI / all-required (pull_request)",
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
# Handlers context absent (renamed away)
}
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "api", return_value=(200, _BP_WITH_SIBLINGS)):
with patch.object(drift, "all_emitted_contexts", return_value=emitted_after_rename):
findings, _ = drift.detect_drift("main")
assert any("F4 —" in f for f in findings)
assert any("Handlers Postgres Integration" in f for f in findings)
def test_detect_drift_f4_catches_all_required_only_trim():
"""If BP is trimmed to JUST `CI / all-required` but E2E/Handlers are
still real workflows, F4 does NOT fire (no stale context) — but F3b
(env vs BP) / operator policy must keep them required. This asserts F4
does not false-positive on a correctly-emitted lone context."""
bp = {"status_check_contexts": ["CI / all-required (pull_request)"]}
ci = _make_ci_doc({"all-required": {}})
audit = _make_audit_doc(["CI / all-required (pull_request)"])
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "api", return_value=(200, bp)):
with patch.object(drift, "all_emitted_contexts", return_value=set(_EMITTED_OK)):
findings, _ = drift.detect_drift("main")
assert not any("F4 —" in f for f in findings)
+21
View File
@@ -500,6 +500,27 @@ jobs:
all-required:
# Aggregator sentinel — RFC internal#219 §2 (Phase 4 — closes internal#286).
#
# ── SCOPE (read before trusting the name) ─────────────────────────
# "all-required" means "all of THIS workflow's (CI's) required jobs"
# — NOT "all of branch-protection's required checks". It is fail-
# closed over its `needs:` (the CI jobs below), but Gitea Actions has
# NO cross-workflow `needs:`, so this sentinel STRUCTURALLY CANNOT and
# does not cover sibling required workflows that live in their own
# files — notably:
# • `E2E API Smoke Test` (.gitea/workflows/e2e-api.yml)
# • `Handlers Postgres Integration`(.gitea/workflows/handlers-postgres-integration.yml)
# Those emit their OWN status contexts and MUST be listed in branch
# protection `status_check_contexts` INDEPENDENTLY of this sentinel.
# They are today; do NOT trim BP down to just `CI / all-required` on
# the assumption that it covers them — it does not, and a red E2E /
# Handlers run would then look mergeable (observed: core PR #1086 @
# 9136d05a — `CI / all-required` green while E2E (id 48) + Handlers
# (id 47) were red; not exploitable only because BP still requires
# all three). The cross-workflow coverage is enforced separately by
# ci-required-drift.py's F4 check (every BP context must have a live
# emitting workflow), which is what keeps this name honest.
# ──────────────────────────────────────────────────────────────────
#
# Emits `CI / all-required (<event>)` where <event> is the workflow trigger
# (e.g. `CI / all-required (pull_request)`, `CI / all-required (push)`).
# Branch protection requires the event-suffixed name —
@@ -272,6 +272,20 @@ func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
}
}
// QueueDepth returns the number of currently-queued (not dispatched/completed)
// items for a workspace. Used by the busy-return response body so callers
// can see how many ahead of them.
func QueueDepth(ctx context.Context, workspaceID string) int {
var n int
if err := db.DB.QueryRowContext(ctx,
`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`,
workspaceID,
).Scan(&n); err != nil {
log.Printf("A2AQueue: QueueDepth query failed for workspace %s: %v", workspaceID, err)
}
return n
}
// DropStaleQueueItems marks queued items older than maxAge as 'dropped' with a
// system-generated reason so PM agents stop processing stale post-incident noise.
// Called with a workspaceID to scope cleanup to one workspace, or empty to sweep
@@ -1,539 +0,0 @@
//go:build integration
// +build integration
// registry_auth_integration_test.go — REAL Postgres integration tests for
// the registry-auth + cross-tenant security boundary (issue #2148).
//
// Run with:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply migrations 001 (workspaces), 020 (workspace_auth_tokens),
// # 035/036 (org_api_tokens + org_id), 043 (status enum), 044
// # (platform_inbound_secret), 045 (delivery_mode) — CI applies the
// # full migrations/ set in lexicographic order (apply-all-or-skip).
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run '^TestIntegration_(RegistryRowState|WSAuth|CanCommunicate|OrgToken)'
//
// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on
// every PR that touches workspace-server/internal/{handlers,wsauth,
// registry,orgtoken}/** OR workspace-server/migrations/**. The
// detect-changes `handlers-postgres` profile was widened to include the
// registry + orgtoken packages in the same PR that added this file so a
// regression in CanCommunicate / orgtoken.Revoke actually triggers the
// suite (#2148).
//
// Why these are NOT plain unit tests
// ----------------------------------
// The strict-sqlmock unit tests in wsauth/tokens_test.go,
// orgtoken/tokens_test.go, and registry/access_test.go pin which SQL
// statements fire — they are fast and let us iterate without a DB. But
// sqlmock asserts the SQL TEXT, not that a real Postgres ENFORCES the
// security predicate. The whole value of this package is the
// cross-tenant non-leak boundary:
//
// - wsauth.ValidateToken binds a bearer to ONE workspace_id; sqlmock
// cannot prove the JOIN on workspaces + the workspaceID equality
// actually rejects a token replayed against a different workspace,
// or a token whose workspace was soft-removed.
// - registry.CanCommunicate walks the parent_id chain in real rows;
// sqlmock returns canned rows so it can never catch a query that
// leaks a SIBLING under a different org root, or a self→self / cross-
// tenant decision that depends on the actual stored parent_id.
// - orgtoken.Revoke / Validate depend on the partial-index +
// revoked_at IS NULL predicate landing the row state; sqlmock is
// satisfied by "an UPDATE fired".
// - the registry register/heartbeat #73 guard
// (WHERE workspaces.status IS DISTINCT FROM 'removed' /
// status != 'removed') is a ROW-STATE invariant: a late
// register/heartbeat MUST NOT resurrect a soft-deleted tombstone.
// sqlmock cannot observe that the row stayed 'removed'.
//
// These tests close those gaps by booting a real Postgres, running the
// production functions (and, for register/heartbeat, replaying the exact
// production statement documented at registry.go:393 / registry.go:604),
// and SELECTing the row to verify the observable state.
package handlers
import (
"context"
"database/sql"
"os"
"testing"
"time"
mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/orgtoken"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/registry"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/wsauth"
_ "github.com/lib/pq"
)
// integrationAuthDB opens a connection from $INTEGRATION_DB_URL (skipping
// the test if unset), wipes the registry-auth tables for isolation, and
// hot-swaps the package-level mdb.DB so production functions that read the
// global (registry.CanCommunicate → registry.getWorkspaceRef) see this
// same connection. Restores the previous global + closes the conn via
// t.Cleanup.
//
// NOT SAFE FOR t.Parallel() — it mutates the package global and owns the
// tables for the duration of the test. This mirrors integrationDB in
// delegation_ledger_integration_test.go but wipes the auth tables (not
// delegations) and is kept separate so each suite's wipe step is local.
//
// Wipe order respects FKs: workspace_auth_tokens + org_api_tokens
// reference workspaces, so they go first. org_api_tokens.org_id is
// ON DELETE SET NULL and workspace_auth_tokens.workspace_id is
// ON DELETE CASCADE, but an explicit ordered DELETE keeps the intent
// obvious and avoids leaving rows behind if the FK actions ever change.
func integrationAuthDB(t *testing.T) *sql.DB {
t.Helper()
url := os.Getenv("INTEGRATION_DB_URL")
if url == "" {
t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)")
}
conn, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("open: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
t.Fatalf("ping: %v", err)
}
for _, tbl := range []string{"workspace_auth_tokens", "org_api_tokens", "workspaces"} {
if _, err := conn.ExecContext(ctx, "DELETE FROM "+tbl); err != nil {
t.Fatalf("cleanup %s: %v", tbl, err)
}
}
prev := mdb.DB
mdb.DB = conn
t.Cleanup(func() {
mdb.DB = prev
conn.Close()
})
return conn
}
// insertWorkspace creates a workspace row with the given status and
// optional parent, returning the DB-generated UUID. parentID may be the
// empty string for a root-level workspace (parent_id IS NULL). status
// must be a valid workspaces.status enum value (043 migration).
func insertWorkspace(t *testing.T, conn *sql.DB, name, status, parentID string) string {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var parent any
if parentID != "" {
parent = parentID
}
var id string
err := conn.QueryRowContext(ctx, `
INSERT INTO workspaces (name, status, parent_id, delivery_mode)
VALUES ($1, $2, $3, 'push')
RETURNING id
`, name, status, parent).Scan(&id)
if err != nil {
t.Fatalf("insertWorkspace(%s): %v", name, err)
}
return id
}
// statusOf reads a workspace row's current status, failing the test if
// the row is gone (a register/heartbeat must never DELETE the row).
func statusOf(t *testing.T, conn *sql.DB, id string) string {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var status string
err := conn.QueryRowContext(ctx, `SELECT status FROM workspaces WHERE id = $1`, id).Scan(&status)
if err != nil {
t.Fatalf("statusOf(%s): %v", id, err)
}
return status
}
// ---------------------------------------------------------------------------
// 1 — registry register/heartbeat row-state: the #73 tombstone guard.
//
// Watch-fail intent: drop `WHERE workspaces.status IS DISTINCT FROM
// 'removed'` from the upsert (or `AND status != 'removed'` from the
// heartbeat) and a late register/heartbeat resurrects a soft-deleted
// workspace back to 'online' — the exact bulk-delete straggler bug the
// guard fixed. These tests replay the EXACT production statements
// (registry.go:393 upsert / registry.go:604 heartbeat) so a regression
// there is caught against a real row, not just an sqlmock SQL-text diff.
// ---------------------------------------------------------------------------
// registerUpsertSQL mirrors RegistryHandler.Register's upsert at
// workspace-server/internal/handlers/registry.go:393. Kept in lockstep
// with that statement; CR2/CI must confirm it still matches the handler.
const registerUpsertSQL = `
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode)
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5)
ON CONFLICT (id) DO UPDATE SET
url = CASE
WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url
ELSE EXCLUDED.url
END,
agent_card = EXCLUDED.agent_card,
status = 'online',
last_heartbeat_at = now(),
delivery_mode = EXCLUDED.delivery_mode,
updated_at = now()
WHERE workspaces.status IS DISTINCT FROM 'removed'
`
// heartbeatUpdateSQL mirrors RegistryHandler.Heartbeat's zero-spend
// branch at workspace-server/internal/handlers/registry.go:604, reduced
// to the columns guaranteed present by the base migration set so the
// test does not depend on optional columns the apply-all-or-skip CI step
// may not have landed. The security-relevant clause — the
// `AND status != 'removed'` guard — is preserved verbatim.
const heartbeatUpdateSQL = `
UPDATE workspaces SET
last_heartbeat_at = now(),
updated_at = now()
WHERE id = $1 AND status != 'removed'
`
func TestIntegration_RegistryRowState_RegisterDoesNotResurrectRemoved(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
id := insertWorkspace(t, conn, "tombstoned-ws", "removed", "")
// A late register for a soft-deleted workspace.
if _, err := conn.ExecContext(ctx, registerUpsertSQL,
id, id, "https://agent.example.com", `{"name":"x"}`, "push"); err != nil {
t.Fatalf("register upsert: %v", err)
}
if got := statusOf(t, conn, id); got != "removed" {
t.Fatalf("removed workspace was resurrected by register: status=%q, want 'removed' (#73 guard regressed)", got)
}
}
func TestIntegration_RegistryRowState_RegisterUpsertsLiveWorkspaceToOnline(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
// A provisioning workspace registering for the first heartbeat should
// flip to online — proves the guard does NOT over-block live rows.
id := insertWorkspace(t, conn, "live-ws", "provisioning", "")
if _, err := conn.ExecContext(ctx, registerUpsertSQL,
id, id, "https://agent.example.com", `{"name":"x"}`, "push"); err != nil {
t.Fatalf("register upsert: %v", err)
}
if got := statusOf(t, conn, id); got != "online" {
t.Fatalf("live workspace register: status=%q, want 'online'", got)
}
}
func TestIntegration_RegistryRowState_HeartbeatDoesNotResurrectRemoved(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
id := insertWorkspace(t, conn, "tombstoned-ws", "removed", "")
if _, err := conn.ExecContext(ctx, heartbeatUpdateSQL, id); err != nil {
t.Fatalf("heartbeat update: %v", err)
}
if got := statusOf(t, conn, id); got != "removed" {
t.Fatalf("removed workspace mutated by heartbeat: status=%q, want 'removed' (#73 guard regressed)", got)
}
// And last_heartbeat_at must NOT have been bumped on the tombstone —
// a refreshed heartbeat would confuse the liveness monitor.
var hb sql.NullTime
if err := conn.QueryRowContext(ctx,
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, id).Scan(&hb); err != nil {
t.Fatalf("read last_heartbeat_at: %v", err)
}
if hb.Valid {
t.Fatalf("removed workspace got last_heartbeat_at bumped by heartbeat: %v (#73 guard regressed)", hb.Time)
}
}
func TestIntegration_RegistryRowState_HeartbeatUpdatesLiveWorkspace(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
id := insertWorkspace(t, conn, "live-ws", "online", "")
if _, err := conn.ExecContext(ctx, heartbeatUpdateSQL, id); err != nil {
t.Fatalf("heartbeat update: %v", err)
}
var hb sql.NullTime
if err := conn.QueryRowContext(ctx,
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, id).Scan(&hb); err != nil {
t.Fatalf("read last_heartbeat_at: %v", err)
}
if !hb.Valid {
t.Fatalf("live workspace heartbeat did NOT bump last_heartbeat_at")
}
if got := statusOf(t, conn, id); got != "online" {
t.Fatalf("live workspace heartbeat changed status unexpectedly: %q", got)
}
}
// ---------------------------------------------------------------------------
// 2 — wsauth.ValidateToken A↔B binding (the cross-tenant non-leak boundary).
//
// Watch-fail intent: drop the `workspaceID != expectedWorkspaceID` check
// (or the JOIN's `w.status != 'removed'`) and a workspace-A token would
// authenticate workspace B, or a token from a soft-removed workspace
// would stay live. sqlmock cannot prove the JOIN rejects either.
// ---------------------------------------------------------------------------
func TestIntegration_WSAuth_TokenBoundToIssuingWorkspace(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
wsA := insertWorkspace(t, conn, "ws-A", "online", "")
wsB := insertWorkspace(t, conn, "ws-B", "online", "")
plaintext, err := wsauth.IssueToken(ctx, conn, wsA)
if err != nil {
t.Fatalf("IssueToken: %v", err)
}
// Correct binding: token validates for its own workspace.
if err := wsauth.ValidateToken(ctx, conn, wsA, plaintext); err != nil {
t.Fatalf("ValidateToken(A, tokenA): want nil, got %v", err)
}
// Cross-workspace replay: A's token MUST NOT authenticate B.
if err := wsauth.ValidateToken(ctx, conn, wsB, plaintext); err != wsauth.ErrInvalidToken {
t.Fatalf("ValidateToken(B, tokenA): want ErrInvalidToken, got %v (cross-tenant binding regressed)", err)
}
// WorkspaceFromToken resolves the OWNING workspace, never B.
owner, err := wsauth.WorkspaceFromToken(ctx, conn, plaintext)
if err != nil {
t.Fatalf("WorkspaceFromToken: %v", err)
}
if owner != wsA {
t.Fatalf("WorkspaceFromToken: got %q, want %q (token rebinding regressed)", owner, wsA)
}
}
func TestIntegration_WSAuth_TokenOfRemovedWorkspaceRejected(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
ws := insertWorkspace(t, conn, "ws-soon-removed", "online", "")
plaintext, err := wsauth.IssueToken(ctx, conn, ws)
if err != nil {
t.Fatalf("IssueToken: %v", err)
}
// Sanity: live before removal.
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != nil {
t.Fatalf("pre-removal ValidateToken: want nil, got %v", err)
}
// Soft-remove the workspace (tombstone) — the token row is NOT
// touched, so only the JOIN's status filter can reject it.
if _, err := conn.ExecContext(ctx,
`UPDATE workspaces SET status = 'removed' WHERE id = $1`, ws); err != nil {
t.Fatalf("soft-remove: %v", err)
}
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != wsauth.ErrInvalidToken {
t.Fatalf("ValidateToken after soft-remove: want ErrInvalidToken, got %v (removed-workspace JOIN filter regressed)", err)
}
if _, err := wsauth.WorkspaceFromToken(ctx, conn, plaintext); err != wsauth.ErrInvalidToken {
t.Fatalf("WorkspaceFromToken after soft-remove: want ErrInvalidToken, got %v", err)
}
}
func TestIntegration_WSAuth_RevokeAllForWorkspaceKillsToken(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
ws := insertWorkspace(t, conn, "ws-rotate", "online", "")
plaintext, err := wsauth.IssueToken(ctx, conn, ws)
if err != nil {
t.Fatalf("IssueToken: %v", err)
}
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != nil {
t.Fatalf("pre-revoke ValidateToken: %v", err)
}
if err := wsauth.RevokeAllForWorkspace(ctx, conn, ws); err != nil {
t.Fatalf("RevokeAllForWorkspace: %v", err)
}
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != wsauth.ErrInvalidToken {
t.Fatalf("ValidateToken after revoke: want ErrInvalidToken, got %v (revoked_at filter regressed)", err)
}
}
// ---------------------------------------------------------------------------
// 3 — registry.CanCommunicate parent_id hierarchy (cross-tenant non-chatter).
//
// Topology (two distinct tenant trees):
//
// rootX (org root, parent_id NULL) rootY (org root, parent_id NULL)
// ├── leadX └── leadY
// │ ├── engA (leaf)
// │ └── engB (leaf, sibling of engA)
//
// Watch-fail intent: relax the parent_id scoping and a leaf under rootX
// could message a leaf under rootY (cross-tenant leak), or two org roots
// (both parent_id NULL) would be treated as siblings. CanCommunicate
// reads the package-global db.DB, which integrationAuthDB hot-swaps to
// the test connection.
// ---------------------------------------------------------------------------
func TestIntegration_CanCommunicate_HierarchyAndCrossTenantIsolation(t *testing.T) {
conn := integrationAuthDB(t)
rootX := insertWorkspace(t, conn, "rootX", "online", "")
leadX := insertWorkspace(t, conn, "leadX", "online", rootX)
engA := insertWorkspace(t, conn, "engA", "online", leadX)
engB := insertWorkspace(t, conn, "engB", "online", leadX)
rootY := insertWorkspace(t, conn, "rootY", "online", "")
leadY := insertWorkspace(t, conn, "leadY", "online", rootY)
cases := []struct {
name string
caller string
target string
want bool
}{
{"self to self", engA, engA, true},
{"siblings (same parent leadX)", engA, engB, true},
{"direct parent to child", leadX, engA, true},
{"direct child to parent", engA, leadX, true},
{"distant ancestor to descendant", rootX, engA, true},
{"distant descendant to ancestor", engA, rootX, true},
// Cross-tenant non-leak: nothing under rootX may talk to rootY tree.
{"cross-tenant leaf to leaf", engA, leadY, false},
{"cross-tenant leaf to other root", engA, rootY, false},
{"cross-tenant root to root", rootX, rootY, false},
// Two org roots both have parent_id NULL — must NOT be siblings.
{"two org roots are not siblings", rootX, rootY, false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := registry.CanCommunicate(tc.caller, tc.target); got != tc.want {
t.Fatalf("CanCommunicate(%s, %s) = %v, want %v", tc.caller, tc.target, got, tc.want)
}
})
}
}
// ---------------------------------------------------------------------------
// 4 — orgtoken revoke / validate row-state.
//
// Watch-fail intent: drop `revoked_at IS NULL` from Validate (or the
// `AND revoked_at IS NULL` from Revoke's idempotency) and a revoked org-
// admin token would keep authenticating, or a second revoke would report
// success on an already-dead token. sqlmock is satisfied by "an UPDATE
// fired" and cannot observe the row no longer authenticates.
// ---------------------------------------------------------------------------
func TestIntegration_OrgToken_RevokeStopsValidation(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
// org_id references workspaces(id); anchor the token to a real org root.
org := insertWorkspace(t, conn, "org-root", "online", "")
plaintext, id, err := orgtoken.Issue(ctx, conn, "ci-token", "tester", org)
if err != nil {
t.Fatalf("Issue: %v", err)
}
// Live token validates and reports its org anchor.
gotID, _, gotOrg, err := orgtoken.Validate(ctx, conn, plaintext)
if err != nil {
t.Fatalf("Validate (live): %v", err)
}
if gotID != id {
t.Fatalf("Validate id = %q, want %q", gotID, id)
}
if gotOrg != org {
t.Fatalf("Validate org_id = %q, want %q (org-anchor regressed)", gotOrg, org)
}
// First revoke flips live → revoked.
transitioned, err := orgtoken.Revoke(ctx, conn, id)
if err != nil {
t.Fatalf("Revoke: %v", err)
}
if !transitioned {
t.Fatalf("Revoke: want true (live→revoked), got false")
}
// Revoked token MUST NOT validate.
if _, _, _, err := orgtoken.Validate(ctx, conn, plaintext); err != orgtoken.ErrInvalidToken {
t.Fatalf("Validate after revoke: want ErrInvalidToken, got %v (revoked_at filter regressed)", err)
}
// Idempotent re-revoke reports false (already revoked), not an error.
transitioned2, err := orgtoken.Revoke(ctx, conn, id)
if err != nil {
t.Fatalf("re-Revoke: %v", err)
}
if transitioned2 {
t.Fatalf("re-Revoke: want false (already revoked), got true (idempotency guard regressed)")
}
}
func TestIntegration_OrgToken_ListExcludesRevoked(t *testing.T) {
conn := integrationAuthDB(t)
ctx := context.Background()
org := insertWorkspace(t, conn, "org-root", "online", "")
_, liveID, err := orgtoken.Issue(ctx, conn, "live", "tester", org)
if err != nil {
t.Fatalf("Issue live: %v", err)
}
_, deadID, err := orgtoken.Issue(ctx, conn, "dead", "tester", org)
if err != nil {
t.Fatalf("Issue dead: %v", err)
}
if _, err := orgtoken.Revoke(ctx, conn, deadID); err != nil {
t.Fatalf("Revoke dead: %v", err)
}
// List must return only the live token (exercises the real
// COALESCE(org_id::text,'') cast that sqlmock can't type-check —
// see the comment in orgtoken.List).
tokens, err := orgtoken.List(ctx, conn)
if err != nil {
t.Fatalf("List: %v", err)
}
if len(tokens) != 1 {
t.Fatalf("List returned %d tokens, want 1 (revoked-exclusion regressed)", len(tokens))
}
if tokens[0].ID != liveID {
t.Fatalf("List returned id %q, want live id %q", tokens[0].ID, liveID)
}
if tokens[0].OrgID != org {
t.Fatalf("List org_id = %q, want %q", tokens[0].OrgID, org)
}
// HasAnyLive is true with one live token, false once it's revoked.
if ok, err := orgtoken.HasAnyLive(ctx, conn); err != nil || !ok {
t.Fatalf("HasAnyLive (one live): ok=%v err=%v, want true,nil", ok, err)
}
if _, err := orgtoken.Revoke(ctx, conn, liveID); err != nil {
t.Fatalf("Revoke live: %v", err)
}
if ok, err := orgtoken.HasAnyLive(ctx, conn); err != nil || ok {
t.Fatalf("HasAnyLive (none live): ok=%v err=%v, want false,nil", ok, err)
}
}