Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 44ab45720f |
@@ -24,6 +24,17 @@ Three failure classes:
|
||||
F3 (B) and (C) are not set-equal. Audit env wider than protection
|
||||
→ audit flags non-force-merges as force; narrower → real
|
||||
force-merges are missed.
|
||||
F4 Context in (B) is emitted by NO workflow in .gitea/workflows/ at
|
||||
all (repo-wide, case-correct generalization of F2, which only
|
||||
covers `ci / `-prefixed names). This is the inverse-of-F2 hole and
|
||||
the one that makes the `CI / all-required` aggregator's
|
||||
name-vs-coverage gap safe: `all-required` is fail-closed over CI's
|
||||
OWN jobs but CANNOT cover sibling required workflows
|
||||
(`E2E API Smoke Test`, `Handlers Postgres Integration` — Gitea has
|
||||
no cross-workflow `needs:`). F4 verifies each cross-workflow
|
||||
required context still has a live emitter, so a renamed/deleted
|
||||
sibling workflow that BP still requires is caught instead of
|
||||
degrading to a silent absent-as-pending advisory gate.
|
||||
|
||||
Idempotency:
|
||||
Searches OPEN issues by exact title prefix
|
||||
@@ -380,6 +391,68 @@ def expected_context(job_key: str, workflow_name: str = "ci") -> str:
|
||||
return f"{workflow_name} / {job_key} (pull_request)"
|
||||
|
||||
|
||||
def workflow_emitted_contexts(wf_doc: dict) -> set[str]:
|
||||
"""The set of `pull_request` status-check contexts a SINGLE workflow
|
||||
emits, computed from its real `name:` + each job's `name or key`.
|
||||
|
||||
Gitea reports a context as `{workflow.name} / {job.name|job.key}
|
||||
(pull_request)`. Unlike `expected_context()` (which hard-codes the
|
||||
lowercase literal `ci` and the bare job-KEY — a shape that does NOT
|
||||
match this repo, whose workflow is `name: CI` and whose CI jobs DO
|
||||
set per-job `name:`), this reads the authoritative names straight
|
||||
from the parsed YAML, so the contexts it produces are byte-equal to
|
||||
what BP records. Used by F4 (cross-workflow emitter existence).
|
||||
|
||||
Jobs whose `if:` gates on `github.event_name`/`github.ref` are still
|
||||
emitters on the events they DO run — they remain in the set; F4 only
|
||||
asserts *existence of an emitter*, never that it ran on a given
|
||||
trigger."""
|
||||
name = wf_doc.get("name")
|
||||
if not isinstance(name, str) or not name:
|
||||
return set()
|
||||
jobs = wf_doc.get("jobs")
|
||||
if not isinstance(jobs, dict):
|
||||
return set()
|
||||
out: set[str] = set()
|
||||
for key, spec in jobs.items():
|
||||
job_name = key
|
||||
if isinstance(spec, dict) and isinstance(spec.get("name"), str) and spec["name"]:
|
||||
job_name = spec["name"]
|
||||
out.add(f"{name} / {job_name} (pull_request)")
|
||||
return out
|
||||
|
||||
|
||||
def all_emitted_contexts(workflows_dir: str = ".gitea/workflows") -> set[str]:
|
||||
"""Union of `pull_request` contexts emitted by EVERY workflow in the
|
||||
repo. F4 uses this to assert that each BP-required
|
||||
`status_check_contexts` entry corresponds to a real emitting
|
||||
workflow+job — closing the inverse-of-F2 hole where BP requires a
|
||||
context that NO workflow produces (e.g. a sibling workflow like
|
||||
`E2E API Smoke Test` or `Handlers Postgres Integration` was renamed
|
||||
or deleted while still required, leaving BP demanding a green it can
|
||||
never receive; Gitea treats absent-as-pending → silent advisory
|
||||
gate). This is what makes the misleadingly-named `CI / all-required`
|
||||
aggregator safe at the repo level: it only covers CI's own jobs, but
|
||||
F4 guarantees the cross-workflow required contexts it CANNOT cover
|
||||
are real and present."""
|
||||
import glob as _glob
|
||||
|
||||
emitted: set[str] = set()
|
||||
for path in sorted(_glob.glob(os.path.join(workflows_dir, "*.yml"))):
|
||||
try:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
doc = yaml.safe_load(f)
|
||||
except (OSError, yaml.YAMLError):
|
||||
# A single unparseable sibling workflow must not blind F4 to
|
||||
# the rest. Skip it loudly; lint-workflow-yaml gates parse
|
||||
# validity separately.
|
||||
sys.stderr.write(f"::warning::F4: could not parse {path}, skipping\n")
|
||||
continue
|
||||
if isinstance(doc, dict):
|
||||
emitted |= workflow_emitted_contexts(doc)
|
||||
return emitted
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Drift detection
|
||||
# --------------------------------------------------------------------------
|
||||
@@ -531,6 +604,36 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
|
||||
+ "\n".join(f" - {c}" for c in stale_protection)
|
||||
)
|
||||
|
||||
# ----- F4: cross-workflow required context has no emitting workflow -----
|
||||
# F2 (above) is scoped to `ci / `-prefixed contexts ONLY, and built
|
||||
# from the hard-coded lowercase literal `ci` + bare job-keys — a shape
|
||||
# that does NOT match this repo (workflow is `name: CI`, jobs set their
|
||||
# own `name:`), so F2 is effectively dormant here. F4 is the
|
||||
# case-correct, REPO-WIDE generalization: it parses every workflow's
|
||||
# real `name:` + job `name|key` and asserts that EVERY BP-required
|
||||
# context is actually emitted by some workflow.
|
||||
#
|
||||
# This is the gate that makes the `CI / all-required` aggregator's
|
||||
# name-vs-coverage gap safe. `all-required` is fail-closed over CI's
|
||||
# OWN jobs but — by Gitea's design (no cross-workflow `needs:`) — it
|
||||
# CANNOT and does not cover sibling required workflows
|
||||
# (`E2E API Smoke Test`, `Handlers Postgres Integration`). Those MUST
|
||||
# be listed in BP independently. F4 verifies each such BP context
|
||||
# still has a live emitter, so the inverse-of-F2 hole — BP requires a
|
||||
# context that no workflow produces (rename/delete a sibling workflow
|
||||
# while still required → Gitea treats absent-as-pending → silent
|
||||
# advisory gate, and a red PR can look mergeable) — is caught.
|
||||
repo_emitted = all_emitted_contexts(os.path.dirname(CI_WORKFLOW_PATH))
|
||||
unemitted = sorted(c for c in contexts if c not in repo_emitted)
|
||||
if unemitted:
|
||||
findings.append(
|
||||
"F4 — branch_protections/{br}.status_check_contexts entries that "
|
||||
"NO workflow in .gitea/workflows/ emits "
|
||||
"(stale required name → silent advisory gate; a red PR can look "
|
||||
"mergeable):\n".format(br=branch)
|
||||
+ "\n".join(f" - {c}" for c in unemitted)
|
||||
)
|
||||
|
||||
# ----- F3: audit env vs protection contexts (set-equal) -----
|
||||
only_in_env = sorted(env_set - contexts)
|
||||
only_in_protection = sorted(contexts - env_set)
|
||||
@@ -556,6 +659,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
|
||||
"protection_contexts": sorted(contexts),
|
||||
"audit_env_checks": sorted(env_set),
|
||||
"expected_contexts": sorted(emitted_contexts),
|
||||
"repo_emitted_contexts": sorted(repo_emitted),
|
||||
}
|
||||
return findings, debug
|
||||
|
||||
|
||||
@@ -26,12 +26,6 @@ PROFILES: dict[str, dict[str, str]] = {
|
||||
"handlers": (
|
||||
r"^workspace-server/internal/handlers/"
|
||||
r"|^workspace-server/internal/wsauth/"
|
||||
# #2148: registry-auth real-PG integration tests (CanCommunicate
|
||||
# parent_id hierarchy lives in internal/registry; org-admin token
|
||||
# revoke/validate lives in internal/orgtoken) run in this same
|
||||
# workflow, so a regression in either package MUST trigger the job.
|
||||
r"|^workspace-server/internal/registry/"
|
||||
r"|^workspace-server/internal/orgtoken/"
|
||||
# #2149: the scheduler real-PG integration tests run in this same
|
||||
# workflow (they reuse its migrated Postgres), so changes to the
|
||||
# scheduler package must trigger the job too.
|
||||
|
||||
@@ -275,3 +275,125 @@ def test_detect_drift_no_f1_when_needs_empty_even_with_jobs():
|
||||
findings, _ = drift.detect_drift("main")
|
||||
|
||||
assert not any("F1 —" in f for f in findings)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# F4 — cross-workflow required-context emitter existence
|
||||
# (closes the `CI / all-required` name-vs-coverage hole: the sentinel is
|
||||
# fail-closed over CI's own jobs but CANNOT cover sibling required
|
||||
# workflows — Gitea has no cross-workflow `needs:` — so F4 guarantees each
|
||||
# BP-required context still has a live emitting workflow.)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_workflow_emitted_contexts_uses_job_name_over_key():
|
||||
"""Job `name:` wins over key; missing name falls back to key."""
|
||||
doc = {
|
||||
"name": "E2E API Smoke Test",
|
||||
"jobs": {
|
||||
"detect-changes": {}, # no name -> key
|
||||
"e2e-api": {"name": "E2E API Smoke Test"},
|
||||
},
|
||||
}
|
||||
got = drift.workflow_emitted_contexts(doc)
|
||||
assert got == {
|
||||
"E2E API Smoke Test / detect-changes (pull_request)",
|
||||
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
|
||||
}
|
||||
|
||||
|
||||
def test_workflow_emitted_contexts_empty_when_no_name():
|
||||
"""A workflow with no top-level `name:` emits nothing F4 can match."""
|
||||
assert drift.workflow_emitted_contexts({"jobs": {"x": {}}}) == set()
|
||||
|
||||
|
||||
def test_all_emitted_contexts_unions_workflow_dir(tmp_path):
|
||||
"""all_emitted_contexts globs *.yml and unions their emitter sets."""
|
||||
wf = tmp_path / "wf"
|
||||
wf.mkdir()
|
||||
(wf / "a.yml").write_text(
|
||||
"name: CI\njobs:\n all-required:\n runs-on: x\n", encoding="utf-8"
|
||||
)
|
||||
(wf / "b.yml").write_text(
|
||||
"name: Handlers Postgres Integration\n"
|
||||
"jobs:\n integration:\n name: Handlers Postgres Integration\n"
|
||||
" runs-on: x\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
got = drift.all_emitted_contexts(str(wf))
|
||||
assert "CI / all-required (pull_request)" in got
|
||||
assert "Handlers Postgres Integration / Handlers Postgres Integration (pull_request)" in got
|
||||
|
||||
|
||||
def test_all_emitted_contexts_skips_unparseable(tmp_path):
|
||||
"""A single broken sibling workflow must not blind F4 to the rest."""
|
||||
wf = tmp_path / "wf"
|
||||
wf.mkdir()
|
||||
(wf / "good.yml").write_text("name: CI\njobs:\n j:\n runs-on: x\n", encoding="utf-8")
|
||||
(wf / "bad.yml").write_text("name: [unterminated\n : : :\n", encoding="utf-8")
|
||||
got = drift.all_emitted_contexts(str(wf))
|
||||
assert "CI / j (pull_request)" in got
|
||||
|
||||
|
||||
# A BP fixture that includes the two cross-workflow required contexts.
|
||||
_BP_WITH_SIBLINGS = {
|
||||
"status_check_contexts": [
|
||||
"CI / all-required (pull_request)",
|
||||
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
|
||||
"Handlers Postgres Integration / Handlers Postgres Integration (pull_request)",
|
||||
]
|
||||
}
|
||||
|
||||
# The matching set of repo-wide emitted contexts (what a correct repo produces).
|
||||
_EMITTED_OK = {
|
||||
"CI / all-required (pull_request)",
|
||||
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
|
||||
"Handlers Postgres Integration / Handlers Postgres Integration (pull_request)",
|
||||
}
|
||||
|
||||
|
||||
def test_detect_drift_f4_silent_when_all_contexts_emitted():
|
||||
"""No F4 when every BP context has a live emitting workflow."""
|
||||
ci = _make_ci_doc({"all-required": {}})
|
||||
audit = _make_audit_doc(sorted(_BP_WITH_SIBLINGS["status_check_contexts"]))
|
||||
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
|
||||
with patch.object(drift, "api", return_value=(200, _BP_WITH_SIBLINGS)):
|
||||
with patch.object(drift, "all_emitted_contexts", return_value=set(_EMITTED_OK)):
|
||||
findings, debug = drift.detect_drift("main")
|
||||
assert not any("F4 —" in f for f in findings)
|
||||
assert debug["repo_emitted_contexts"] == sorted(_EMITTED_OK)
|
||||
|
||||
|
||||
def test_detect_drift_f4_fires_on_stale_cross_workflow_context():
|
||||
"""The core gate-hole regression: BP requires a cross-workflow context
|
||||
(e.g. a renamed/deleted sibling workflow) that NO workflow emits.
|
||||
F4 must fire — this is the inverse-of-F2 hole that makes a red PR look
|
||||
mergeable if BP is ever trimmed/renamed around `CI / all-required`."""
|
||||
ci = _make_ci_doc({"all-required": {}})
|
||||
audit = _make_audit_doc(sorted(_BP_WITH_SIBLINGS["status_check_contexts"]))
|
||||
# Handlers workflow got renamed -> its OLD BP context now has no emitter.
|
||||
emitted_after_rename = {
|
||||
"CI / all-required (pull_request)",
|
||||
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
|
||||
# Handlers context absent (renamed away)
|
||||
}
|
||||
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
|
||||
with patch.object(drift, "api", return_value=(200, _BP_WITH_SIBLINGS)):
|
||||
with patch.object(drift, "all_emitted_contexts", return_value=emitted_after_rename):
|
||||
findings, _ = drift.detect_drift("main")
|
||||
assert any("F4 —" in f for f in findings)
|
||||
assert any("Handlers Postgres Integration" in f for f in findings)
|
||||
|
||||
|
||||
def test_detect_drift_f4_catches_all_required_only_trim():
|
||||
"""If BP is trimmed to JUST `CI / all-required` but E2E/Handlers are
|
||||
still real workflows, F4 does NOT fire (no stale context) — but F3b
|
||||
(env vs BP) / operator policy must keep them required. This asserts F4
|
||||
does not false-positive on a correctly-emitted lone context."""
|
||||
bp = {"status_check_contexts": ["CI / all-required (pull_request)"]}
|
||||
ci = _make_ci_doc({"all-required": {}})
|
||||
audit = _make_audit_doc(["CI / all-required (pull_request)"])
|
||||
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
|
||||
with patch.object(drift, "api", return_value=(200, bp)):
|
||||
with patch.object(drift, "all_emitted_contexts", return_value=set(_EMITTED_OK)):
|
||||
findings, _ = drift.detect_drift("main")
|
||||
assert not any("F4 —" in f for f in findings)
|
||||
|
||||
@@ -500,6 +500,27 @@ jobs:
|
||||
all-required:
|
||||
# Aggregator sentinel — RFC internal#219 §2 (Phase 4 — closes internal#286).
|
||||
#
|
||||
# ── SCOPE (read before trusting the name) ─────────────────────────
|
||||
# "all-required" means "all of THIS workflow's (CI's) required jobs"
|
||||
# — NOT "all of branch-protection's required checks". It is fail-
|
||||
# closed over its `needs:` (the CI jobs below), but Gitea Actions has
|
||||
# NO cross-workflow `needs:`, so this sentinel STRUCTURALLY CANNOT and
|
||||
# does not cover sibling required workflows that live in their own
|
||||
# files — notably:
|
||||
# • `E2E API Smoke Test` (.gitea/workflows/e2e-api.yml)
|
||||
# • `Handlers Postgres Integration`(.gitea/workflows/handlers-postgres-integration.yml)
|
||||
# Those emit their OWN status contexts and MUST be listed in branch
|
||||
# protection `status_check_contexts` INDEPENDENTLY of this sentinel.
|
||||
# They are today; do NOT trim BP down to just `CI / all-required` on
|
||||
# the assumption that it covers them — it does not, and a red E2E /
|
||||
# Handlers run would then look mergeable (observed: core PR #1086 @
|
||||
# 9136d05a — `CI / all-required` green while E2E (id 48) + Handlers
|
||||
# (id 47) were red; not exploitable only because BP still requires
|
||||
# all three). The cross-workflow coverage is enforced separately by
|
||||
# ci-required-drift.py's F4 check (every BP context must have a live
|
||||
# emitting workflow), which is what keeps this name honest.
|
||||
# ──────────────────────────────────────────────────────────────────
|
||||
#
|
||||
# Emits `CI / all-required (<event>)` where <event> is the workflow trigger
|
||||
# (e.g. `CI / all-required (pull_request)`, `CI / all-required (push)`).
|
||||
# Branch protection requires the event-suffixed name —
|
||||
|
||||
@@ -272,6 +272,20 @@ func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
|
||||
}
|
||||
}
|
||||
|
||||
// QueueDepth returns the number of currently-queued (not dispatched/completed)
|
||||
// items for a workspace. Used by the busy-return response body so callers
|
||||
// can see how many ahead of them.
|
||||
func QueueDepth(ctx context.Context, workspaceID string) int {
|
||||
var n int
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`,
|
||||
workspaceID,
|
||||
).Scan(&n); err != nil {
|
||||
log.Printf("A2AQueue: QueueDepth query failed for workspace %s: %v", workspaceID, err)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// DropStaleQueueItems marks queued items older than maxAge as 'dropped' with a
|
||||
// system-generated reason so PM agents stop processing stale post-incident noise.
|
||||
// Called with a workspaceID to scope cleanup to one workspace, or empty to sweep
|
||||
|
||||
@@ -1,539 +0,0 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// registry_auth_integration_test.go — REAL Postgres integration tests for
|
||||
// the registry-auth + cross-tenant security boundary (issue #2148).
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// docker run --rm -d --name pg-integration \
|
||||
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
|
||||
// -p 55432:5432 postgres:15-alpine
|
||||
// sleep 4
|
||||
// # apply migrations 001 (workspaces), 020 (workspace_auth_tokens),
|
||||
// # 035/036 (org_api_tokens + org_id), 043 (status enum), 044
|
||||
// # (platform_inbound_secret), 045 (delivery_mode) — CI applies the
|
||||
// # full migrations/ set in lexicographic order (apply-all-or-skip).
|
||||
// cd workspace-server
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run '^TestIntegration_(RegistryRowState|WSAuth|CanCommunicate|OrgToken)'
|
||||
//
|
||||
// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on
|
||||
// every PR that touches workspace-server/internal/{handlers,wsauth,
|
||||
// registry,orgtoken}/** OR workspace-server/migrations/**. The
|
||||
// detect-changes `handlers-postgres` profile was widened to include the
|
||||
// registry + orgtoken packages in the same PR that added this file so a
|
||||
// regression in CanCommunicate / orgtoken.Revoke actually triggers the
|
||||
// suite (#2148).
|
||||
//
|
||||
// Why these are NOT plain unit tests
|
||||
// ----------------------------------
|
||||
// The strict-sqlmock unit tests in wsauth/tokens_test.go,
|
||||
// orgtoken/tokens_test.go, and registry/access_test.go pin which SQL
|
||||
// statements fire — they are fast and let us iterate without a DB. But
|
||||
// sqlmock asserts the SQL TEXT, not that a real Postgres ENFORCES the
|
||||
// security predicate. The whole value of this package is the
|
||||
// cross-tenant non-leak boundary:
|
||||
//
|
||||
// - wsauth.ValidateToken binds a bearer to ONE workspace_id; sqlmock
|
||||
// cannot prove the JOIN on workspaces + the workspaceID equality
|
||||
// actually rejects a token replayed against a different workspace,
|
||||
// or a token whose workspace was soft-removed.
|
||||
// - registry.CanCommunicate walks the parent_id chain in real rows;
|
||||
// sqlmock returns canned rows so it can never catch a query that
|
||||
// leaks a SIBLING under a different org root, or a self→self / cross-
|
||||
// tenant decision that depends on the actual stored parent_id.
|
||||
// - orgtoken.Revoke / Validate depend on the partial-index +
|
||||
// revoked_at IS NULL predicate landing the row state; sqlmock is
|
||||
// satisfied by "an UPDATE fired".
|
||||
// - the registry register/heartbeat #73 guard
|
||||
// (WHERE workspaces.status IS DISTINCT FROM 'removed' /
|
||||
// status != 'removed') is a ROW-STATE invariant: a late
|
||||
// register/heartbeat MUST NOT resurrect a soft-deleted tombstone.
|
||||
// sqlmock cannot observe that the row stayed 'removed'.
|
||||
//
|
||||
// These tests close those gaps by booting a real Postgres, running the
|
||||
// production functions (and, for register/heartbeat, replaying the exact
|
||||
// production statement documented at registry.go:393 / registry.go:604),
|
||||
// and SELECTing the row to verify the observable state.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/orgtoken"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/registry"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/wsauth"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
// integrationAuthDB opens a connection from $INTEGRATION_DB_URL (skipping
|
||||
// the test if unset), wipes the registry-auth tables for isolation, and
|
||||
// hot-swaps the package-level mdb.DB so production functions that read the
|
||||
// global (registry.CanCommunicate → registry.getWorkspaceRef) see this
|
||||
// same connection. Restores the previous global + closes the conn via
|
||||
// t.Cleanup.
|
||||
//
|
||||
// NOT SAFE FOR t.Parallel() — it mutates the package global and owns the
|
||||
// tables for the duration of the test. This mirrors integrationDB in
|
||||
// delegation_ledger_integration_test.go but wipes the auth tables (not
|
||||
// delegations) and is kept separate so each suite's wipe step is local.
|
||||
//
|
||||
// Wipe order respects FKs: workspace_auth_tokens + org_api_tokens
|
||||
// reference workspaces, so they go first. org_api_tokens.org_id is
|
||||
// ON DELETE SET NULL and workspace_auth_tokens.workspace_id is
|
||||
// ON DELETE CASCADE, but an explicit ordered DELETE keeps the intent
|
||||
// obvious and avoids leaving rows behind if the FK actions ever change.
|
||||
func integrationAuthDB(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
url := os.Getenv("INTEGRATION_DB_URL")
|
||||
if url == "" {
|
||||
t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)")
|
||||
}
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := conn.PingContext(ctx); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
for _, tbl := range []string{"workspace_auth_tokens", "org_api_tokens", "workspaces"} {
|
||||
if _, err := conn.ExecContext(ctx, "DELETE FROM "+tbl); err != nil {
|
||||
t.Fatalf("cleanup %s: %v", tbl, err)
|
||||
}
|
||||
}
|
||||
prev := mdb.DB
|
||||
mdb.DB = conn
|
||||
t.Cleanup(func() {
|
||||
mdb.DB = prev
|
||||
conn.Close()
|
||||
})
|
||||
return conn
|
||||
}
|
||||
|
||||
// insertWorkspace creates a workspace row with the given status and
|
||||
// optional parent, returning the DB-generated UUID. parentID may be the
|
||||
// empty string for a root-level workspace (parent_id IS NULL). status
|
||||
// must be a valid workspaces.status enum value (043 migration).
|
||||
func insertWorkspace(t *testing.T, conn *sql.DB, name, status, parentID string) string {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
var parent any
|
||||
if parentID != "" {
|
||||
parent = parentID
|
||||
}
|
||||
var id string
|
||||
err := conn.QueryRowContext(ctx, `
|
||||
INSERT INTO workspaces (name, status, parent_id, delivery_mode)
|
||||
VALUES ($1, $2, $3, 'push')
|
||||
RETURNING id
|
||||
`, name, status, parent).Scan(&id)
|
||||
if err != nil {
|
||||
t.Fatalf("insertWorkspace(%s): %v", name, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// statusOf reads a workspace row's current status, failing the test if
|
||||
// the row is gone (a register/heartbeat must never DELETE the row).
|
||||
func statusOf(t *testing.T, conn *sql.DB, id string) string {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
var status string
|
||||
err := conn.QueryRowContext(ctx, `SELECT status FROM workspaces WHERE id = $1`, id).Scan(&status)
|
||||
if err != nil {
|
||||
t.Fatalf("statusOf(%s): %v", id, err)
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 1 — registry register/heartbeat row-state: the #73 tombstone guard.
|
||||
//
|
||||
// Watch-fail intent: drop `WHERE workspaces.status IS DISTINCT FROM
|
||||
// 'removed'` from the upsert (or `AND status != 'removed'` from the
|
||||
// heartbeat) and a late register/heartbeat resurrects a soft-deleted
|
||||
// workspace back to 'online' — the exact bulk-delete straggler bug the
|
||||
// guard fixed. These tests replay the EXACT production statements
|
||||
// (registry.go:393 upsert / registry.go:604 heartbeat) so a regression
|
||||
// there is caught against a real row, not just an sqlmock SQL-text diff.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// registerUpsertSQL mirrors RegistryHandler.Register's upsert at
|
||||
// workspace-server/internal/handlers/registry.go:393. Kept in lockstep
|
||||
// with that statement; CR2/CI must confirm it still matches the handler.
|
||||
const registerUpsertSQL = `
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
url = CASE
|
||||
WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url
|
||||
ELSE EXCLUDED.url
|
||||
END,
|
||||
agent_card = EXCLUDED.agent_card,
|
||||
status = 'online',
|
||||
last_heartbeat_at = now(),
|
||||
delivery_mode = EXCLUDED.delivery_mode,
|
||||
updated_at = now()
|
||||
WHERE workspaces.status IS DISTINCT FROM 'removed'
|
||||
`
|
||||
|
||||
// heartbeatUpdateSQL mirrors RegistryHandler.Heartbeat's zero-spend
|
||||
// branch at workspace-server/internal/handlers/registry.go:604, reduced
|
||||
// to the columns guaranteed present by the base migration set so the
|
||||
// test does not depend on optional columns the apply-all-or-skip CI step
|
||||
// may not have landed. The security-relevant clause — the
|
||||
// `AND status != 'removed'` guard — is preserved verbatim.
|
||||
const heartbeatUpdateSQL = `
|
||||
UPDATE workspaces SET
|
||||
last_heartbeat_at = now(),
|
||||
updated_at = now()
|
||||
WHERE id = $1 AND status != 'removed'
|
||||
`
|
||||
|
||||
func TestIntegration_RegistryRowState_RegisterDoesNotResurrectRemoved(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
id := insertWorkspace(t, conn, "tombstoned-ws", "removed", "")
|
||||
|
||||
// A late register for a soft-deleted workspace.
|
||||
if _, err := conn.ExecContext(ctx, registerUpsertSQL,
|
||||
id, id, "https://agent.example.com", `{"name":"x"}`, "push"); err != nil {
|
||||
t.Fatalf("register upsert: %v", err)
|
||||
}
|
||||
|
||||
if got := statusOf(t, conn, id); got != "removed" {
|
||||
t.Fatalf("removed workspace was resurrected by register: status=%q, want 'removed' (#73 guard regressed)", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_RegistryRowState_RegisterUpsertsLiveWorkspaceToOnline(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// A provisioning workspace registering for the first heartbeat should
|
||||
// flip to online — proves the guard does NOT over-block live rows.
|
||||
id := insertWorkspace(t, conn, "live-ws", "provisioning", "")
|
||||
|
||||
if _, err := conn.ExecContext(ctx, registerUpsertSQL,
|
||||
id, id, "https://agent.example.com", `{"name":"x"}`, "push"); err != nil {
|
||||
t.Fatalf("register upsert: %v", err)
|
||||
}
|
||||
|
||||
if got := statusOf(t, conn, id); got != "online" {
|
||||
t.Fatalf("live workspace register: status=%q, want 'online'", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_RegistryRowState_HeartbeatDoesNotResurrectRemoved(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
id := insertWorkspace(t, conn, "tombstoned-ws", "removed", "")
|
||||
|
||||
if _, err := conn.ExecContext(ctx, heartbeatUpdateSQL, id); err != nil {
|
||||
t.Fatalf("heartbeat update: %v", err)
|
||||
}
|
||||
|
||||
if got := statusOf(t, conn, id); got != "removed" {
|
||||
t.Fatalf("removed workspace mutated by heartbeat: status=%q, want 'removed' (#73 guard regressed)", got)
|
||||
}
|
||||
|
||||
// And last_heartbeat_at must NOT have been bumped on the tombstone —
|
||||
// a refreshed heartbeat would confuse the liveness monitor.
|
||||
var hb sql.NullTime
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, id).Scan(&hb); err != nil {
|
||||
t.Fatalf("read last_heartbeat_at: %v", err)
|
||||
}
|
||||
if hb.Valid {
|
||||
t.Fatalf("removed workspace got last_heartbeat_at bumped by heartbeat: %v (#73 guard regressed)", hb.Time)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_RegistryRowState_HeartbeatUpdatesLiveWorkspace(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
id := insertWorkspace(t, conn, "live-ws", "online", "")
|
||||
|
||||
if _, err := conn.ExecContext(ctx, heartbeatUpdateSQL, id); err != nil {
|
||||
t.Fatalf("heartbeat update: %v", err)
|
||||
}
|
||||
|
||||
var hb sql.NullTime
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, id).Scan(&hb); err != nil {
|
||||
t.Fatalf("read last_heartbeat_at: %v", err)
|
||||
}
|
||||
if !hb.Valid {
|
||||
t.Fatalf("live workspace heartbeat did NOT bump last_heartbeat_at")
|
||||
}
|
||||
if got := statusOf(t, conn, id); got != "online" {
|
||||
t.Fatalf("live workspace heartbeat changed status unexpectedly: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 2 — wsauth.ValidateToken A↔B binding (the cross-tenant non-leak boundary).
|
||||
//
|
||||
// Watch-fail intent: drop the `workspaceID != expectedWorkspaceID` check
|
||||
// (or the JOIN's `w.status != 'removed'`) and a workspace-A token would
|
||||
// authenticate workspace B, or a token from a soft-removed workspace
|
||||
// would stay live. sqlmock cannot prove the JOIN rejects either.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestIntegration_WSAuth_TokenBoundToIssuingWorkspace(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
wsA := insertWorkspace(t, conn, "ws-A", "online", "")
|
||||
wsB := insertWorkspace(t, conn, "ws-B", "online", "")
|
||||
|
||||
plaintext, err := wsauth.IssueToken(ctx, conn, wsA)
|
||||
if err != nil {
|
||||
t.Fatalf("IssueToken: %v", err)
|
||||
}
|
||||
|
||||
// Correct binding: token validates for its own workspace.
|
||||
if err := wsauth.ValidateToken(ctx, conn, wsA, plaintext); err != nil {
|
||||
t.Fatalf("ValidateToken(A, tokenA): want nil, got %v", err)
|
||||
}
|
||||
|
||||
// Cross-workspace replay: A's token MUST NOT authenticate B.
|
||||
if err := wsauth.ValidateToken(ctx, conn, wsB, plaintext); err != wsauth.ErrInvalidToken {
|
||||
t.Fatalf("ValidateToken(B, tokenA): want ErrInvalidToken, got %v (cross-tenant binding regressed)", err)
|
||||
}
|
||||
|
||||
// WorkspaceFromToken resolves the OWNING workspace, never B.
|
||||
owner, err := wsauth.WorkspaceFromToken(ctx, conn, plaintext)
|
||||
if err != nil {
|
||||
t.Fatalf("WorkspaceFromToken: %v", err)
|
||||
}
|
||||
if owner != wsA {
|
||||
t.Fatalf("WorkspaceFromToken: got %q, want %q (token rebinding regressed)", owner, wsA)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_WSAuth_TokenOfRemovedWorkspaceRejected(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
ws := insertWorkspace(t, conn, "ws-soon-removed", "online", "")
|
||||
plaintext, err := wsauth.IssueToken(ctx, conn, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("IssueToken: %v", err)
|
||||
}
|
||||
// Sanity: live before removal.
|
||||
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != nil {
|
||||
t.Fatalf("pre-removal ValidateToken: want nil, got %v", err)
|
||||
}
|
||||
|
||||
// Soft-remove the workspace (tombstone) — the token row is NOT
|
||||
// touched, so only the JOIN's status filter can reject it.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = 'removed' WHERE id = $1`, ws); err != nil {
|
||||
t.Fatalf("soft-remove: %v", err)
|
||||
}
|
||||
|
||||
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != wsauth.ErrInvalidToken {
|
||||
t.Fatalf("ValidateToken after soft-remove: want ErrInvalidToken, got %v (removed-workspace JOIN filter regressed)", err)
|
||||
}
|
||||
if _, err := wsauth.WorkspaceFromToken(ctx, conn, plaintext); err != wsauth.ErrInvalidToken {
|
||||
t.Fatalf("WorkspaceFromToken after soft-remove: want ErrInvalidToken, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_WSAuth_RevokeAllForWorkspaceKillsToken(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
ws := insertWorkspace(t, conn, "ws-rotate", "online", "")
|
||||
plaintext, err := wsauth.IssueToken(ctx, conn, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("IssueToken: %v", err)
|
||||
}
|
||||
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != nil {
|
||||
t.Fatalf("pre-revoke ValidateToken: %v", err)
|
||||
}
|
||||
|
||||
if err := wsauth.RevokeAllForWorkspace(ctx, conn, ws); err != nil {
|
||||
t.Fatalf("RevokeAllForWorkspace: %v", err)
|
||||
}
|
||||
|
||||
if err := wsauth.ValidateToken(ctx, conn, ws, plaintext); err != wsauth.ErrInvalidToken {
|
||||
t.Fatalf("ValidateToken after revoke: want ErrInvalidToken, got %v (revoked_at filter regressed)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 3 — registry.CanCommunicate parent_id hierarchy (cross-tenant non-chatter).
|
||||
//
|
||||
// Topology (two distinct tenant trees):
|
||||
//
|
||||
// rootX (org root, parent_id NULL) rootY (org root, parent_id NULL)
|
||||
// ├── leadX └── leadY
|
||||
// │ ├── engA (leaf)
|
||||
// │ └── engB (leaf, sibling of engA)
|
||||
//
|
||||
// Watch-fail intent: relax the parent_id scoping and a leaf under rootX
|
||||
// could message a leaf under rootY (cross-tenant leak), or two org roots
|
||||
// (both parent_id NULL) would be treated as siblings. CanCommunicate
|
||||
// reads the package-global db.DB, which integrationAuthDB hot-swaps to
|
||||
// the test connection.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestIntegration_CanCommunicate_HierarchyAndCrossTenantIsolation(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
|
||||
rootX := insertWorkspace(t, conn, "rootX", "online", "")
|
||||
leadX := insertWorkspace(t, conn, "leadX", "online", rootX)
|
||||
engA := insertWorkspace(t, conn, "engA", "online", leadX)
|
||||
engB := insertWorkspace(t, conn, "engB", "online", leadX)
|
||||
|
||||
rootY := insertWorkspace(t, conn, "rootY", "online", "")
|
||||
leadY := insertWorkspace(t, conn, "leadY", "online", rootY)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
caller string
|
||||
target string
|
||||
want bool
|
||||
}{
|
||||
{"self to self", engA, engA, true},
|
||||
{"siblings (same parent leadX)", engA, engB, true},
|
||||
{"direct parent to child", leadX, engA, true},
|
||||
{"direct child to parent", engA, leadX, true},
|
||||
{"distant ancestor to descendant", rootX, engA, true},
|
||||
{"distant descendant to ancestor", engA, rootX, true},
|
||||
// Cross-tenant non-leak: nothing under rootX may talk to rootY tree.
|
||||
{"cross-tenant leaf to leaf", engA, leadY, false},
|
||||
{"cross-tenant leaf to other root", engA, rootY, false},
|
||||
{"cross-tenant root to root", rootX, rootY, false},
|
||||
// Two org roots both have parent_id NULL — must NOT be siblings.
|
||||
{"two org roots are not siblings", rootX, rootY, false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := registry.CanCommunicate(tc.caller, tc.target); got != tc.want {
|
||||
t.Fatalf("CanCommunicate(%s, %s) = %v, want %v", tc.caller, tc.target, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 4 — orgtoken revoke / validate row-state.
|
||||
//
|
||||
// Watch-fail intent: drop `revoked_at IS NULL` from Validate (or the
|
||||
// `AND revoked_at IS NULL` from Revoke's idempotency) and a revoked org-
|
||||
// admin token would keep authenticating, or a second revoke would report
|
||||
// success on an already-dead token. sqlmock is satisfied by "an UPDATE
|
||||
// fired" and cannot observe the row no longer authenticates.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestIntegration_OrgToken_RevokeStopsValidation(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// org_id references workspaces(id); anchor the token to a real org root.
|
||||
org := insertWorkspace(t, conn, "org-root", "online", "")
|
||||
|
||||
plaintext, id, err := orgtoken.Issue(ctx, conn, "ci-token", "tester", org)
|
||||
if err != nil {
|
||||
t.Fatalf("Issue: %v", err)
|
||||
}
|
||||
|
||||
// Live token validates and reports its org anchor.
|
||||
gotID, _, gotOrg, err := orgtoken.Validate(ctx, conn, plaintext)
|
||||
if err != nil {
|
||||
t.Fatalf("Validate (live): %v", err)
|
||||
}
|
||||
if gotID != id {
|
||||
t.Fatalf("Validate id = %q, want %q", gotID, id)
|
||||
}
|
||||
if gotOrg != org {
|
||||
t.Fatalf("Validate org_id = %q, want %q (org-anchor regressed)", gotOrg, org)
|
||||
}
|
||||
|
||||
// First revoke flips live → revoked.
|
||||
transitioned, err := orgtoken.Revoke(ctx, conn, id)
|
||||
if err != nil {
|
||||
t.Fatalf("Revoke: %v", err)
|
||||
}
|
||||
if !transitioned {
|
||||
t.Fatalf("Revoke: want true (live→revoked), got false")
|
||||
}
|
||||
|
||||
// Revoked token MUST NOT validate.
|
||||
if _, _, _, err := orgtoken.Validate(ctx, conn, plaintext); err != orgtoken.ErrInvalidToken {
|
||||
t.Fatalf("Validate after revoke: want ErrInvalidToken, got %v (revoked_at filter regressed)", err)
|
||||
}
|
||||
|
||||
// Idempotent re-revoke reports false (already revoked), not an error.
|
||||
transitioned2, err := orgtoken.Revoke(ctx, conn, id)
|
||||
if err != nil {
|
||||
t.Fatalf("re-Revoke: %v", err)
|
||||
}
|
||||
if transitioned2 {
|
||||
t.Fatalf("re-Revoke: want false (already revoked), got true (idempotency guard regressed)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_OrgToken_ListExcludesRevoked(t *testing.T) {
|
||||
conn := integrationAuthDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
org := insertWorkspace(t, conn, "org-root", "online", "")
|
||||
|
||||
_, liveID, err := orgtoken.Issue(ctx, conn, "live", "tester", org)
|
||||
if err != nil {
|
||||
t.Fatalf("Issue live: %v", err)
|
||||
}
|
||||
_, deadID, err := orgtoken.Issue(ctx, conn, "dead", "tester", org)
|
||||
if err != nil {
|
||||
t.Fatalf("Issue dead: %v", err)
|
||||
}
|
||||
if _, err := orgtoken.Revoke(ctx, conn, deadID); err != nil {
|
||||
t.Fatalf("Revoke dead: %v", err)
|
||||
}
|
||||
|
||||
// List must return only the live token (exercises the real
|
||||
// COALESCE(org_id::text,'') cast that sqlmock can't type-check —
|
||||
// see the comment in orgtoken.List).
|
||||
tokens, err := orgtoken.List(ctx, conn)
|
||||
if err != nil {
|
||||
t.Fatalf("List: %v", err)
|
||||
}
|
||||
if len(tokens) != 1 {
|
||||
t.Fatalf("List returned %d tokens, want 1 (revoked-exclusion regressed)", len(tokens))
|
||||
}
|
||||
if tokens[0].ID != liveID {
|
||||
t.Fatalf("List returned id %q, want live id %q", tokens[0].ID, liveID)
|
||||
}
|
||||
if tokens[0].OrgID != org {
|
||||
t.Fatalf("List org_id = %q, want %q", tokens[0].OrgID, org)
|
||||
}
|
||||
|
||||
// HasAnyLive is true with one live token, false once it's revoked.
|
||||
if ok, err := orgtoken.HasAnyLive(ctx, conn); err != nil || !ok {
|
||||
t.Fatalf("HasAnyLive (one live): ok=%v err=%v, want true,nil", ok, err)
|
||||
}
|
||||
if _, err := orgtoken.Revoke(ctx, conn, liveID); err != nil {
|
||||
t.Fatalf("Revoke live: %v", err)
|
||||
}
|
||||
if ok, err := orgtoken.HasAnyLive(ctx, conn); err != nil || ok {
|
||||
t.Fatalf("HasAnyLive (none live): ok=%v err=%v, want false,nil", ok, err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user