Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 061186212b | |||
| 12b2a86368 | |||
| a77568ac09 | |||
| 5ea8aef21c |
@@ -268,34 +268,6 @@ def api(
|
||||
return status, {"_raw": raw.decode("utf-8", errors="replace")}
|
||||
|
||||
|
||||
def api_paginated(
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
query: dict[str, str] | None = None,
|
||||
page_size: int = 50,
|
||||
) -> list[dict]:
|
||||
"""Fetch all pages of a paginated Gitea list endpoint.
|
||||
|
||||
Gitea paginates with `page` (1-indexed) and `limit`. We loop until a
|
||||
page returns fewer than `page_size` items, indicating the end.
|
||||
"""
|
||||
results: list[dict] = []
|
||||
page = 1
|
||||
while True:
|
||||
page_query = dict(query or {})
|
||||
page_query["page"] = str(page)
|
||||
page_query["limit"] = str(page_size)
|
||||
_, body = api(method, path, query=page_query)
|
||||
if not isinstance(body, list):
|
||||
raise ApiError(f"{path} paginated response not list")
|
||||
results.extend(body)
|
||||
if len(body) < page_size:
|
||||
break
|
||||
page += 1
|
||||
return results
|
||||
|
||||
|
||||
def required_contexts(raw: str) -> list[str]:
|
||||
return [part.strip() for part in raw.split(",") if part.strip()]
|
||||
|
||||
@@ -328,18 +300,17 @@ def _is_tier_low_pending_ok(
|
||||
) -> bool:
|
||||
"""Return True if tier:low PR can tolerate sop-checklist pending state.
|
||||
|
||||
GENERIC PENDING-AS-GREEN REMOVED (Researcher + CR2 RC on #2368):
|
||||
The prior soft-fail accepted ANY pending sop-checklist for tier:low,
|
||||
which allowed required checks to pass without genuine verification.
|
||||
Pending required sop-checklist must now always HOLD and appear in
|
||||
missing_or_bad. This function is retained as a policy hook but
|
||||
currently always returns False so pending never counts green.
|
||||
|
||||
If a positively identifiable genuine soft-fail state is defined in
|
||||
future (e.g., a specific check-run conclusion), implement it here
|
||||
with strict positive identification — never default to pass.
|
||||
Per sop-checklist-config.yaml tier_failure_mode, tier:low uses soft-fail:
|
||||
sop-checklist posts state=pending when acks are satisfied (missing
|
||||
manager/ceo acks are informational only). The queue should accept
|
||||
pending instead of waiting for success.
|
||||
"""
|
||||
return False
|
||||
if "tier:low" not in pr_labels:
|
||||
return False
|
||||
if "sop-checklist" not in context:
|
||||
return False
|
||||
status = latest_statuses.get(context) or {}
|
||||
return status_state(status) == "pending"
|
||||
|
||||
|
||||
def required_contexts_green(
|
||||
@@ -688,23 +659,32 @@ def get_combined_status(sha: str) -> dict:
|
||||
"""Combined status + all individual statuses for `sha`.
|
||||
|
||||
The /status endpoint caps the `statuses` array at 30 entries (Gitea
|
||||
default page size), so we fetch the full list via /statuses. The combined
|
||||
`state` still comes from /status.
|
||||
default page size), so we fetch the full list via /statuses with a
|
||||
higher limit. The combined `state` still comes from /status.
|
||||
|
||||
Fail-closed: BOTH the PRIMARY /status fetch AND the SECONDARY /statuses
|
||||
enrichment must succeed. If either raises, the error propagates so the
|
||||
caller skips this PR this tick (we never treat a failed status fetch as
|
||||
green — dev-sop "no fail-open"). A paginated /statuses error must NOT
|
||||
silently degrade to an incomplete status set.
|
||||
Fail-closed: the PRIMARY /status fetch must succeed. If it raises, the
|
||||
error propagates so the caller skips this PR this tick (we never treat a
|
||||
failed status fetch as green — dev-sop "no fail-open"). Only the SECONDARY
|
||||
/statuses enrichment (which merely extends the per-context list beyond the
|
||||
30-entry cap) is best-effort; if it fails we still have the combined set.
|
||||
"""
|
||||
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
|
||||
if not isinstance(combined, dict):
|
||||
raise ApiError(f"status for {sha} response not object")
|
||||
combined_statuses: list[dict] = combined.get("statuses") or []
|
||||
all_statuses = api_paginated(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
|
||||
)
|
||||
try:
|
||||
_, all_statuses_raw = api(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
|
||||
query={"limit": "50"},
|
||||
)
|
||||
if isinstance(all_statuses_raw, list):
|
||||
all_statuses: list[dict] = list(all_statuses_raw)
|
||||
else:
|
||||
all_statuses = []
|
||||
except (ApiError, urllib.error.URLError, TimeoutError, OSError) as exc:
|
||||
sys.stderr.write(f"::warning::could not fetch full statuses list for {sha[:8]}: {exc}\n")
|
||||
all_statuses = []
|
||||
# Build latest per context: process combined (ascending→reverse=newest
|
||||
# first), then fill gaps from all_statuses (already newest-first).
|
||||
latest: dict[str, dict] = {}
|
||||
@@ -721,15 +701,19 @@ def get_combined_status(sha: str) -> dict:
|
||||
|
||||
|
||||
def list_queued_issues() -> list[dict]:
|
||||
return api_paginated(
|
||||
_, body = api(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/issues",
|
||||
query={
|
||||
"state": "open",
|
||||
"type": "pulls",
|
||||
"labels": QUEUE_LABEL,
|
||||
"limit": "50",
|
||||
},
|
||||
)
|
||||
if not isinstance(body, list):
|
||||
raise ApiError("queued issues response not list")
|
||||
return body
|
||||
|
||||
|
||||
def list_candidate_issues(*, auto_discover: bool) -> list[dict]:
|
||||
@@ -743,14 +727,18 @@ def list_candidate_issues(*, auto_discover: bool) -> list[dict]:
|
||||
"""
|
||||
if not auto_discover:
|
||||
return list_queued_issues()
|
||||
return api_paginated(
|
||||
_, body = api(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/issues",
|
||||
query={
|
||||
"state": "open",
|
||||
"type": "pulls",
|
||||
"limit": "50",
|
||||
},
|
||||
)
|
||||
if not isinstance(body, list):
|
||||
raise ApiError("candidate issues response not list")
|
||||
return body
|
||||
|
||||
|
||||
def get_pull(pr_number: int) -> dict:
|
||||
@@ -1110,18 +1098,18 @@ def main() -> int:
|
||||
try:
|
||||
return process_once(dry_run=args.dry_run)
|
||||
except ApiError as exc:
|
||||
# FAIL-CLOSED: API errors are not "transient success" — they mean
|
||||
# the queue could not evaluate merge state. Returning 0 hides
|
||||
# persistent infra issues (auth drift, endpoint outages) from
|
||||
# operators. Return 1 so the cron job surfaces red and paging fires.
|
||||
# API errors (401/403/404/500) are transient for a queue tick —
|
||||
# log and exit 0 so the workflow is not marked failed and the next
|
||||
# tick can retry. Returning non-zero would permanently fail the
|
||||
# workflow run, blocking future ticks.
|
||||
sys.stderr.write(f"::error::queue API error: {exc}\n")
|
||||
return 1
|
||||
return 0
|
||||
except urllib.error.URLError as exc:
|
||||
sys.stderr.write(f"::error::queue network error: {exc}\n")
|
||||
return 1
|
||||
return 0
|
||||
except TimeoutError as exc:
|
||||
sys.stderr.write(f"::error::queue timeout: {exc}\n")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -546,24 +546,16 @@ def verify_flip(flip: dict, branch: str, n: int) -> dict:
|
||||
|
||||
shas = recent_commits_on_branch(branch, n)
|
||||
if not shas:
|
||||
result["masked_runs"].append({
|
||||
"sha": "",
|
||||
"status": "unverified",
|
||||
"target_url": "",
|
||||
"samples": [f"no recent commits on {branch} — cannot verify flip"],
|
||||
})
|
||||
result["warnings"].append(
|
||||
f"no recent commits on {branch} (cannot verify flip)"
|
||||
)
|
||||
return result
|
||||
|
||||
for sha in shas:
|
||||
try:
|
||||
status_doc = combined_status(sha)
|
||||
except ApiError as e:
|
||||
result["masked_runs"].append({
|
||||
"sha": sha,
|
||||
"status": "error",
|
||||
"target_url": "",
|
||||
"samples": [f"combined-status API error: {e}"],
|
||||
})
|
||||
result["warnings"].append(f"combined-status for {sha}: {e}")
|
||||
continue
|
||||
statuses = status_doc.get("statuses") or []
|
||||
# First entry matching the context name. Newest SHAs come
|
||||
@@ -590,17 +582,6 @@ def verify_flip(flip: dict, branch: str, n: int) -> dict:
|
||||
"target_url": target_url,
|
||||
"samples": ["[log unavailable; status itself is " + state + "]"],
|
||||
})
|
||||
elif state == "success":
|
||||
# Fail-closed: unreadable log on a success status is a
|
||||
# potential Quirk #10 mask (continue-on-error hiding real
|
||||
# failures). We cannot verify it's clean, so treat as
|
||||
# masked rather than allowing the flip.
|
||||
result["masked_runs"].append({
|
||||
"sha": sha,
|
||||
"status": state,
|
||||
"target_url": target_url,
|
||||
"samples": ["[log unavailable; cannot verify status is genuine — treat as masked]"],
|
||||
})
|
||||
break
|
||||
samples = grep_fail_markers(log_text)
|
||||
if state in ("failure", "error"):
|
||||
@@ -624,12 +605,10 @@ def verify_flip(flip: dict, branch: str, n: int) -> dict:
|
||||
break
|
||||
|
||||
if result["checked_commits"] == 0:
|
||||
result["masked_runs"].append({
|
||||
"sha": "",
|
||||
"status": "unverified",
|
||||
"target_url": "",
|
||||
"samples": [f"no runs of {target_context!r} found in the last {n} commits on {branch} — cannot verify flip"],
|
||||
})
|
||||
result["warnings"].append(
|
||||
f"no runs of {target_context!r} found in the last {n} commits on "
|
||||
f"{branch} — cannot verify; allowing flip with warning"
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
|
||||
@@ -689,8 +689,8 @@ def reap_branch(
|
||||
shas = list_recent_commit_shas(branch, limit)
|
||||
except ApiError as e:
|
||||
print(
|
||||
"::error::status-reaper cannot run: commit-list API failed "
|
||||
f"after retries: {e}"
|
||||
"::warning::status-reaper skipped this tick because the "
|
||||
f"commit list could not be read after retries: {e}"
|
||||
)
|
||||
return {
|
||||
"scanned_shas": 0,
|
||||
@@ -704,7 +704,6 @@ def reap_branch(
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_per_sha": {},
|
||||
"sha_api_errors": 0,
|
||||
"skipped": True,
|
||||
"skip_reason": "commit-list-api-error",
|
||||
}
|
||||
@@ -721,7 +720,6 @@ def reap_branch(
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_per_sha": {},
|
||||
"sha_api_errors": 0,
|
||||
}
|
||||
|
||||
for sha in shas:
|
||||
@@ -733,9 +731,8 @@ def reap_branch(
|
||||
try:
|
||||
combined = get_combined_status(sha)
|
||||
except ApiError as e:
|
||||
aggregate["sha_api_errors"] += 1
|
||||
print(
|
||||
f"::error::get_combined_status({sha[:10]}) failed; "
|
||||
f"::warning::get_combined_status({sha[:10]}) failed; "
|
||||
f"skipping this SHA: {e}"
|
||||
)
|
||||
continue
|
||||
@@ -822,14 +819,6 @@ def main() -> int:
|
||||
sort_keys=True,
|
||||
)
|
||||
)
|
||||
# Observability: infra-failure → red. If the commit list could not be
|
||||
# read or any per-SHA status fetch failed, the tick is incomplete and
|
||||
# must be observable as a failure (non-zero exit) so the cron bot or
|
||||
# runner surface alerts.
|
||||
if counters.get("skipped"):
|
||||
return 1
|
||||
if counters.get("sha_api_errors", 0) > 0:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -2,8 +2,6 @@ import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
SCRIPT = Path(__file__).resolve().parents[1] / "gitea-merge-queue.py"
|
||||
spec = importlib.util.spec_from_file_location("gitea_merge_queue", SCRIPT)
|
||||
mq = importlib.util.module_from_spec(spec)
|
||||
@@ -46,35 +44,6 @@ def test_required_contexts_green_rejects_missing_and_pending():
|
||||
]
|
||||
|
||||
|
||||
def test_required_contexts_green_rejects_volume_skipped_even_for_tier_low():
|
||||
"""volume-skipped pending is a partial view, not a genuine soft-fail.
|
||||
|
||||
Per sop-checklist.py:1179-1187, volume_skipped posts pending with a
|
||||
'[volume-skipped]' prefix. The merge queue must NOT treat this as an
|
||||
acceptable soft-fail for tier:low — the gate did not finish evaluating.
|
||||
"""
|
||||
latest = mq.latest_statuses_by_context([
|
||||
{"context": "CI / all-required (pull_request)", "status": "success"},
|
||||
{
|
||||
"context": "sop-checklist / all-items-acked (pull_request)",
|
||||
"status": "pending",
|
||||
"description": "[volume-skipped] comment-cap=1000 hit; please file ...",
|
||||
},
|
||||
])
|
||||
|
||||
ok, missing_or_bad = mq.required_contexts_green(
|
||||
latest,
|
||||
[
|
||||
"CI / all-required (pull_request)",
|
||||
"sop-checklist / all-items-acked (pull_request)",
|
||||
],
|
||||
pr_labels={"tier:low"},
|
||||
)
|
||||
|
||||
assert ok is False
|
||||
assert "sop-checklist / all-items-acked (pull_request)=pending" in missing_or_bad
|
||||
|
||||
|
||||
def test_choose_next_pr_sorts_by_queue_label_timestamp_then_number():
|
||||
issues = [
|
||||
{
|
||||
@@ -563,61 +532,6 @@ def test_status_fetch_failure_is_fail_closed(monkeypatch):
|
||||
assert merged["called"] is False
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Pagination: api_paginated loops pages and is fail-closed on page errors
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
def test_api_paginated_loops_pages_until_partial(monkeypatch):
|
||||
"""api_paginated fetches all pages and stops when a page is < page_size."""
|
||||
calls = []
|
||||
|
||||
def fake_api(method, path, *, query=None, **kw):
|
||||
page = int((query or {}).get("page", "1"))
|
||||
limit = int((query or {}).get("limit", "50"))
|
||||
calls.append((page, limit))
|
||||
if page == 1:
|
||||
return 200, [{"number": 1}, {"number": 2}]
|
||||
if page == 2:
|
||||
return 200, [{"number": 3}]
|
||||
return 200, []
|
||||
|
||||
monkeypatch.setattr(mq, "api", fake_api)
|
||||
results = mq.api_paginated("GET", "/repos/o/r/issues", page_size=2)
|
||||
assert len(results) == 3
|
||||
assert results[0]["number"] == 1
|
||||
assert results[1]["number"] == 2
|
||||
assert results[2]["number"] == 3
|
||||
assert calls == [(1, 2), (2, 2)]
|
||||
|
||||
|
||||
def test_api_paginated_raises_on_non_list(monkeypatch):
|
||||
"""A page that returns a dict instead of list is an error."""
|
||||
def fake_api(method, path, *, query=None, **kw):
|
||||
return 200, {"message": "not found"}
|
||||
|
||||
monkeypatch.setattr(mq, "api", fake_api)
|
||||
with pytest.raises(mq.ApiError):
|
||||
mq.api_paginated("GET", "/repos/o/r/issues")
|
||||
|
||||
|
||||
def test_get_combined_status_propagates_paginated_statuses_error(monkeypatch):
|
||||
"""If the paginated /statuses enrichment raises, the error propagates
|
||||
(fail-closed — we do NOT silently fall back to an incomplete status set)."""
|
||||
monkeypatch.setattr(mq, "OWNER", "o")
|
||||
monkeypatch.setattr(mq, "NAME", "r")
|
||||
|
||||
def fake_api(method, path, *, query=None, **kw):
|
||||
if path.endswith("/status"):
|
||||
return 200, {"state": "success", "statuses": [{"context": "c1", "status": "success", "id": 1}]}
|
||||
if path.endswith("/statuses"):
|
||||
raise mq.ApiError("GET /statuses -> HTTP 502")
|
||||
raise mq.ApiError(f"unexpected {path}")
|
||||
|
||||
monkeypatch.setattr(mq, "api", fake_api)
|
||||
with pytest.raises(mq.ApiError, match="GET /statuses"):
|
||||
mq.get_combined_status("a" * 40)
|
||||
|
||||
|
||||
def test_process_once_holds_tick_when_branch_protection_unavailable(monkeypatch):
|
||||
"""BP enumeration failure → HOLD the whole tick (no merge, rc 0)."""
|
||||
merged = {"called": False}
|
||||
|
||||
@@ -320,10 +320,10 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
self.assertEqual(len(verdict["fail_runs"]), 1)
|
||||
self.assertEqual(verdict["fail_runs"][0]["status"], "failure")
|
||||
|
||||
def test_unreadable_log_on_success_blocks(self):
|
||||
# Fail-closed: log fetch 404 (None) on a success status is a
|
||||
# potential Quirk #10 mask — we cannot verify it's genuine, so
|
||||
# we block the flip rather than allowing it.
|
||||
def test_unreadable_log_warns_not_blocks(self):
|
||||
# Acceptance test #5: log fetch 404 (None) → warn, not block.
|
||||
# Status is `success`, log is None — we can't tell, so we warn
|
||||
# and allow.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=["sha1"]):
|
||||
with mock.patch.object(
|
||||
lpfc, "combined_status",
|
||||
@@ -332,8 +332,7 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
with mock.patch.object(lpfc, "fetch_log", return_value=None):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
self.assertEqual(len(verdict["masked_runs"]), 1)
|
||||
self.assertIn("log unavailable", verdict["masked_runs"][0]["samples"][0])
|
||||
self.assertEqual(verdict["masked_runs"], [])
|
||||
self.assertTrue(any("log unavailable" in w for w in verdict["warnings"]))
|
||||
|
||||
def test_unreadable_log_with_failure_status_still_blocks(self):
|
||||
@@ -350,9 +349,9 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
self.assertEqual(len(verdict["fail_runs"]), 1)
|
||||
self.assertIn("log unavailable", verdict["fail_runs"][0]["samples"][0])
|
||||
|
||||
def test_zero_runs_history_blocks(self):
|
||||
# No commits with a matching context — cannot verify the flip.
|
||||
# Fail-closed: treat as masked rather than allowing.
|
||||
def test_zero_runs_history_warns_allows(self):
|
||||
# No commits with a matching context — newly added workflow.
|
||||
# Allow with warning.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=["sha1", "sha2"]):
|
||||
with mock.patch.object(
|
||||
lpfc, "combined_status",
|
||||
@@ -361,32 +360,17 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["checked_commits"], 0)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
self.assertEqual(len(verdict["masked_runs"]), 1)
|
||||
self.assertIn("cannot verify flip", verdict["masked_runs"][0]["samples"][0])
|
||||
self.assertEqual(verdict["masked_runs"], [])
|
||||
self.assertTrue(any("no runs of" in w for w in verdict["warnings"]))
|
||||
|
||||
def test_zero_commits_blocks(self):
|
||||
# Empty branch (newly created repo, e.g.). Fail-closed: block.
|
||||
def test_zero_commits_warns_allows(self):
|
||||
# Empty branch (newly created repo, e.g.). Allow with warning.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=[]):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["checked_commits"], 0)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
self.assertEqual(len(verdict["masked_runs"]), 1)
|
||||
self.assertIn("cannot verify flip", verdict["masked_runs"][0]["samples"][0])
|
||||
|
||||
def test_combined_status_api_error_blocks(self):
|
||||
# Fail-closed: combined_status ApiError means the check history is
|
||||
# unreadable — we cannot verify the flip, so block as masked.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=["sha1"]):
|
||||
with mock.patch.object(
|
||||
lpfc, "combined_status",
|
||||
side_effect=lpfc.ApiError("GET /statuses/sha → HTTP 500"),
|
||||
):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["checked_commits"], 0)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
# One masked_run from the ApiError, one from zero checked_commits.
|
||||
self.assertEqual(len(verdict["masked_runs"]), 2)
|
||||
self.assertIn("API error", verdict["masked_runs"][0]["samples"][0])
|
||||
self.assertEqual(verdict["masked_runs"], [])
|
||||
self.assertTrue(any("no recent commits" in w for w in verdict["warnings"]))
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
@@ -77,16 +77,12 @@ env:
|
||||
jobs:
|
||||
detect-changes:
|
||||
name: detect-changes
|
||||
# mc#1529 §1: pin to `docker-host` so the integration job runs on the
|
||||
# operator-host runners (molecule-runner-*), which carry the
|
||||
# `molecule-core-net` bridge network this workflow depends on. PC2
|
||||
# runners (hongming-pc-runner-*) also advertise ubuntu-latest but
|
||||
# don't have that network — the previous `runs-on: ubuntu-latest`
|
||||
# rolled the dice and hard-failed the bridge-inspect step ~30% of
|
||||
# the time. detect-changes itself doesn't need the bridge, but keeping
|
||||
# both jobs on the same label avoids workspace-volume cross-host
|
||||
# surprises and keeps the routing rule discoverable in one place.
|
||||
runs-on: docker-host
|
||||
# mc#1529 §1 + internal#797: pin to `docker-host` runners that ALSO
|
||||
# advertise the `molecule-core-net` label. This bridges the gap where
|
||||
# some `docker-host` runners were provisioned without the network.
|
||||
# The array means the runner must satisfy BOTH labels (Gitea Actions
|
||||
# label-AND semantics). See status-reaper.yml for the same pattern.
|
||||
runs-on: [docker-host, molecule-core-net]
|
||||
# mc#1982 Phase 3 (RFC §1): surface broken workflows without blocking.
|
||||
# mc#1982: mask removed. If regressions appear, root-fix the underlying
|
||||
# test — do NOT renew the mask silently.
|
||||
@@ -116,9 +112,10 @@ jobs:
|
||||
integration:
|
||||
name: Handlers Postgres Integration
|
||||
needs: detect-changes
|
||||
# mc#1529 §1: must run on operator-host (where `molecule-core-net`
|
||||
# exists). See detect-changes for the full routing rationale.
|
||||
runs-on: docker-host
|
||||
# mc#1529 §1 + internal#797: must run on operator-host where
|
||||
# `molecule-core-net` exists. See detect-changes for the full
|
||||
# routing rationale.
|
||||
runs-on: [docker-host, molecule-core-net]
|
||||
# mc#1982 Phase 3 (RFC §1): surface broken workflows without blocking.
|
||||
# mc#1982: mask removed. If regressions appear, root-fix the underlying
|
||||
# test — do NOT renew the mask silently.
|
||||
|
||||
@@ -61,9 +61,11 @@ name: Lint pre-flip continue-on-error
|
||||
# feedback_no_shared_persona_token_use.
|
||||
#
|
||||
# Phase contract (RFC internal#219 §1 ladder):
|
||||
# - Flipped to `continue-on-error: false` after Researcher live-verified
|
||||
# clean runs. The script's own 35 pytest tests pass and recent PR
|
||||
# history shows no masked regressions — the gate is now enforcing.
|
||||
# - This workflow lands at `continue-on-error: true` (Phase 3 —
|
||||
# surface defects without blocking). Follow-up PR flips it to
|
||||
# `false` ONLY after this workflow's own recent runs on `main`
|
||||
# are confirmed clean — exactly the discipline the workflow
|
||||
# itself enforces. Eat your own dogfood.
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
@@ -95,9 +97,10 @@ jobs:
|
||||
name: Verify continue-on-error flips have run-log proof
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 8
|
||||
# Fail-closed: the lint script is verified clean (35/35 tests pass,
|
||||
# Researcher live-check confirmed). Masking removed per mc#1982 close-out.
|
||||
continue-on-error: false
|
||||
# Phase 3 (RFC internal#219 §1): surface broken flips without blocking
|
||||
# the PR yet. Follow-up flips this to `false` once the workflow itself
|
||||
# has clean recent runs on main. mc#1982 interim — remove when CoE→false.
|
||||
continue-on-error: true # mc#1982
|
||||
steps:
|
||||
- name: Check out PR head (full history for base-SHA access)
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
@@ -1,293 +0,0 @@
|
||||
# RFC: Org-level Platform Agent — a tenant-resident concierge
|
||||
|
||||
**Perspective:** CTO + Backend Engineer + DevOps
|
||||
**Status:** Draft — pre-implementation, **CTO sign-off required before any implementation PR**
|
||||
**Scope:** `molecule-core` (workspace-server), `molecule-controlplane`, workspace runtime, `molecule-app`
|
||||
**This document is the single source of truth (SSOT) for the feature.** Code, OpenAPI, the platform
|
||||
MCP, and end-user docs reconcile to this RFC — not to each other.
|
||||
|
||||
---
|
||||
|
||||
## 1. Summary
|
||||
|
||||
Today a Molecule tenant is a control/router box: one EC2 runs the `workspace-server`
|
||||
(`molecule-tenant` container) + Postgres + Redis, and **each workspace is its own separate EC2**
|
||||
running a runtime image that joins the tenant's A2A mesh. A2A has exactly two participant kinds:
|
||||
**workspaces** (agents) and the **user** (the canvas, modeled implicitly as `activity_logs.source_id
|
||||
IS NULL`). A user who wants to *do* anything must drive individual workspaces directly — create them,
|
||||
assign agents, wire channels/schedules/secrets — i.e. they must carry a lot of platform knowledge.
|
||||
|
||||
This RFC introduces a **platform agent**: an always-on org-level agent that
|
||||
|
||||
1. runs as a **container on the tenant EC2** itself (beside `molecule-tenant`),
|
||||
2. natively holds the **platform-management MCP** (the org-admin tool surface) so it can do anything
|
||||
in the org,
|
||||
3. joins A2A as a **first-class third participant** (`kind='platform'`) that sits at the org root, and
|
||||
4. becomes the **user's default chat target** — a concierge the user talks to like a chatbot, which
|
||||
then orchestrates the org on their behalf.
|
||||
|
||||
Destructive actions the concierge triggers are **human-approved** through the existing approvals
|
||||
subsystem.
|
||||
|
||||
## 2. Motivation
|
||||
|
||||
- **Lower the knowledge floor.** "Spin up an SEO team and have them publish weekly" should be a
|
||||
sentence, not a sequence of workspace/agent/schedule/secret operations.
|
||||
- **One front door.** A single conversational entry point that *is* the org, instead of N per-workspace
|
||||
chats the user has to coordinate.
|
||||
- **Reuse, don't rebuild.** The agent runtime, A2A mesh, the 87-tool platform MCP, and the approvals
|
||||
subsystem already exist. This feature is mostly *composition* plus one honest new participant kind.
|
||||
|
||||
## 3. Goals / Non-Goals
|
||||
|
||||
**Goals**
|
||||
- A per-tenant platform agent, provisioned automatically, that controls the org via the platform MCP.
|
||||
- A first-class `platform` participant in A2A with correct routing and tenant isolation.
|
||||
- Server-side approval gating for destructive org operations.
|
||||
- Parity with normal workspaces for runtime/model/provider/billing (no special-casing).
|
||||
|
||||
**Non-Goals (this RFC)**
|
||||
- Replacing the canvas. The canvas remains the advanced/power-user surface.
|
||||
- Multi-concierge / per-team concierges. Exactly **one** platform agent per org.
|
||||
- A new scoped-down token system for the MCP (tracked separately; see §10 Open Questions).
|
||||
|
||||
## 4. Current-state ground truth (verified, with references)
|
||||
|
||||
- **Topology.** Tenant EC2 runs `molecule-tenant` (workspace-server) + Postgres + Redis;
|
||||
`controlplane/internal/provisioner/ec2.go:buildTenantUserDataSM()` `docker run`s it with
|
||||
`--network host`, `PORT=8080`. Each **workspace is its own EC2** (`ec2.go:ProvisionWorkspace`).
|
||||
- **No `org_id` column.** An "org" is the `parent_id IS NULL` subtree root;
|
||||
`workspace-server/internal/handlers/org_scope.go` resolves it with a recursive CTE (`orgRootID`) and
|
||||
`sameOrg()` compares two workspaces' resolved roots for tenant isolation (#1953/OFFSEC-015).
|
||||
- **A2A authorization is hierarchy-based.** `workspace-server/internal/registry/access.go:CanCommunicate`
|
||||
permits self / siblings / ancestor↔descendant. Root-level rows are "siblings" but every routing path
|
||||
is additionally gated by `sameOrg()`.
|
||||
- **No participant-kind discriminator.** `workspaces.role` is a free-form string; the user is implicit
|
||||
(`activity_logs.source_id IS NULL`). `migrations/001_workspaces.sql`.
|
||||
- **Runtime injects MCP servers** in the claude-code executor's `mcp_servers` dict — today exactly one
|
||||
entry, `"a2a"` (`molecule-ai-workspace-template-claude-code/claude_sdk_executor.py`,
|
||||
`molecule_runtime/claude_sdk_executor.py`). The agent self-registers via `POST /registry/register`
|
||||
(`molecule_runtime/main.py`) and is identified by `WORKSPACE_ID` + `X-Molecule-Org-Id`.
|
||||
- **Platform MCP** (`molecule-mcp-server`, stdio Node) authenticates purely from env
|
||||
(`MOLECULE_API_KEY` = org-admin token, `MOLECULE_API_URL`, `MOLECULE_ORG_ID`; `src/api.ts`), is a
|
||||
thin proxy over the tenant REST/A2A API (`chat_with_agent` → `POST /workspaces/:id/a2a`,
|
||||
`async_delegate` → `/delegate`), and has **zero embeddability blockers**.
|
||||
- **Billing** is a per-workspace resolver — `ResolveLLMBillingModeDerived`
|
||||
(`workspace-server/internal/handlers/workspace_provision.go`, `llm_billing_mode.go`), defaulting
|
||||
closed to `platform_managed`; `byok` runs on the tenant's own provider key (see
|
||||
`docs/architecture/byok-fail-closed-billing.md`).
|
||||
- **Approvals** exist: `migrations/007_approvals.sql`, `internal/handlers/approvals.go`,
|
||||
`EventApprovalRequested`, decide route `POST /workspaces/:id/approvals/:approvalId/decide`.
|
||||
|
||||
## 5. Design
|
||||
|
||||
### 5.1 The platform agent IS the org root
|
||||
|
||||
Because `sameOrg()` resolves each workspace to its topmost `parent_id IS NULL` root, a platform agent
|
||||
added as a *second* root would resolve to a *different* root than the existing team and be **blocked**
|
||||
by `sameOrg`. Therefore the platform agent **becomes the single org root**, and the org's existing
|
||||
root is **re-parented under it**. Consequences:
|
||||
|
||||
- `orgRootID(any workspace) == platform-agent-id`; `sameOrg(platform, any in-org ws) == true`.
|
||||
- The platform agent reaches every workspace (and is reachable) via the **existing**
|
||||
ancestor↔descendant rules — **no `CanCommunicate` change**, and tenant isolation is unchanged.
|
||||
|
||||
This is the honest realization of "a third participant above workspace and user": the concierge is
|
||||
literally the org.
|
||||
|
||||
### 5.2 `kind` discriminator (the only new marker)
|
||||
|
||||
Add a single column `workspaces.kind TEXT NOT NULL DEFAULT 'workspace'`, constrained to
|
||||
`('workspace','platform')`. It is the **only** marker of the platform agent — we do **not** also
|
||||
encode identity in `role`/`tier` (those stay descriptive). The enum is defined once: the migration
|
||||
`CHECK` and the Go constants `KindWorkspace`/`KindPlatform` (+ one `IsValidKind`) are kept in lockstep.
|
||||
|
||||
Invariants (handler-enforced, since there is no `org_id` for a pure-SQL unique):
|
||||
- `kind='platform' ⇒ parent_id IS NULL`.
|
||||
- A row may be `kind='platform'` only if it is its own org root (`orgRootID(self) == self`), giving
|
||||
"exactly one platform agent per org". Guard the check+write in a tx with `FOR UPDATE` on the root.
|
||||
|
||||
### 5.3 Identity & registration
|
||||
|
||||
- **ID** = derived `uuidv5(org-namespace, "platform-agent")` — reproducible, no stored-vs-derived
|
||||
drift, lowercase so it satisfies the runtime's `WORKSPACE_ID` validator.
|
||||
- CP **pre-seeds** the `workspaces` row (`kind='platform'`, `parent_id=NULL`, `tier=0`) before the
|
||||
agent boots; the agent self-registers (`POST /registry/register`) into that row. `Register` accepts
|
||||
an optional `kind` and reconciles it, enforcing the §5.2 invariants.
|
||||
|
||||
### 5.4 Default-target resolver
|
||||
|
||||
New `GET /registry/platform-agent` (handler `internal/handlers/platform_agent.go`): resolve the
|
||||
caller's `orgRootID()` and return it iff `kind='platform'`. This is the server hook the dashboard
|
||||
targets by default; no change to `ProxyA2A`. **Authored in the OpenAPI SSOT first**; MCP/CLI/docs
|
||||
derive from it.
|
||||
|
||||
### 5.5 Runtime: two MCPs, config-driven
|
||||
|
||||
Make the runtime's `mcp_servers` **config-driven** rather than hardcoded:
|
||||
- `molecule_runtime/config.py`: add `extra_mcp_servers: list[dict]` to `WorkspaceConfig`, read
|
||||
`raw.get("mcp_servers", [])`.
|
||||
- Both executors merge `extra_mcp_servers` into the `mcp_servers` dict after the always-on `"a2a"`
|
||||
entry (the template `claude_sdk_executor.py` is the live one; the runtime-package copy is the
|
||||
fallback).
|
||||
|
||||
The platform agent's `config.yaml` then declares:
|
||||
|
||||
```yaml
|
||||
runtime: claude-code
|
||||
model: sonnet # default; user-switchable model AND provider via providers.yaml
|
||||
a2a:
|
||||
port: 8090 # avoid the workspace default 8000 under host networking
|
||||
mcp_servers:
|
||||
- name: platform
|
||||
command: node
|
||||
args: ["/opt/molecule-mcp-server/dist/index.js"]
|
||||
```
|
||||
|
||||
The `platform` MCP reads `MOLECULE_API_KEY`/`MOLECULE_API_URL`/`MOLECULE_ORG_ID` from the container
|
||||
env (passed through to the stdio child) — no per-server `env` block needed.
|
||||
|
||||
### 5.6 Hosting & provisioning (tenant EC2 container)
|
||||
|
||||
In `ec2.go:buildTenantUserDataSM()` add a `start_platform_agent` stage **after** `wait_platform_health`
|
||||
(the agent registers against `localhost:8080` on boot):
|
||||
|
||||
```bash
|
||||
docker run -d --restart=always --name molecule-platform-agent --network host \
|
||||
-v /data/platform-agent/configs:/configs \
|
||||
-e WORKSPACE_ID=<platform-uuid> -e WORKSPACE_CONFIG_PATH=/configs \
|
||||
-e PLATFORM_URL=http://localhost:8080 \
|
||||
-e MOLECULE_API_URL=http://localhost:8080 -e MOLECULE_API_KEY=$ADMIN_TOKEN -e MOLECULE_ORG_ID=<orgID> \
|
||||
-e ANTHROPIC_AUTH_TOKEN=$ADMIN_TOKEN -e MOLECULE_LLM_ANTHROPIC_BASE_URL=$MOLECULE_LLM_ANTHROPIC_BASE_URL \
|
||||
<platform-agent-image>
|
||||
```
|
||||
|
||||
- The org `admin_token` is already on the box (Secrets Manager `molecule/tenant/{orgID}`).
|
||||
- `--restart=always` provides Docker-level supervision (matches `molecule-tenant`).
|
||||
- Mirror the block into the redeploy path (`buildRedeployScript`) so existing tenants backfill it.
|
||||
|
||||
### 5.7 Image
|
||||
|
||||
A **dedicated `molecule-platform-agent` image**: `FROM workspace-template-claude-code`, `COPY` the
|
||||
prebuilt `molecule-mcp-server/dist` + `node_modules` into `/opt/molecule-mcp-server`, and **pin Node
|
||||
20** (the slim base ships Node 18; the MCP expects ≥20). A dedicated image keeps the org-admin MCP
|
||||
**out of** ordinary workspace images (security hygiene) and lets us set concierge defaults without
|
||||
touching the workspace template. `molecule-ci` publishes it.
|
||||
|
||||
### 5.8 Approval gate (server-side trust boundary)
|
||||
|
||||
The MCP is a *client* of the tenant handlers, so enforcement lives in the **handlers**, not the MCP.
|
||||
|
||||
- `internal/approvals/policy.go` (new): one auditable map of gated actions —
|
||||
`delete_workspace`, `deprovision`, `secret_write`, `org_token_mint`.
|
||||
- `requireApproval(ctx, workspaceID, action, contextHash)` reuses the existing approvals
|
||||
INSERT/broadcast/escalate. If an `approved`+unconsumed row matches → consume it → proceed. Else
|
||||
create a `pending` row, broadcast `EventApprovalRequested`, and return **HTTP 202
|
||||
`{approval_id, status:"pending"}`** instead of executing. The human decides via the existing decide
|
||||
route; the agent retries and the gate now passes.
|
||||
- Add `approval_requests.consumed_at` (single-use) and optional `request_hash` (dedupe identical
|
||||
pending requests).
|
||||
- **Escalation:** the platform agent's `parent_id` is NULL, so platform-originated approvals escalate
|
||||
to the **user** (canvas notify), not a parent.
|
||||
- The 202 response shape is authored in the **OpenAPI SSOT**.
|
||||
|
||||
### 5.9 Billing & model/provider parity
|
||||
|
||||
The platform agent is a `workspaces` row, so it inherits the one billing resolver and the
|
||||
`providers.yaml` runtime matrix unchanged:
|
||||
- **Default `platform_managed`** (metered CP proxy, billed to org credits) — the env wiring in §5.6.
|
||||
- **`byok`** = flip `/admin/workspaces/:id/llm-billing-mode` + supply the org's `ANTHROPIC_API_KEY`
|
||||
secret (workspace or global). Exposed as a provisioning flag so a tenant can choose at create time.
|
||||
- Model **and provider** are switchable (Claude, Kimi-for-coding, …) via the same dashboard
|
||||
model-switcher any workspace uses.
|
||||
|
||||
### 5.10 UX (summary; detailed in app RFC / Phase 5)
|
||||
|
||||
The **dashboard** (`molecule-app`) becomes the primary entry: a concierge chat (default-targeting the
|
||||
§5.4 resolver) plus a live org overview, with pending approvals surfaced inline. The **canvas** stays
|
||||
for advanced users. First UI version is produced in Claude Design and iterated before build.
|
||||
|
||||
## 6. SSOT mapping (derive, don't fork)
|
||||
|
||||
| Concern | Single source of truth | This RFC's rule |
|
||||
|---|---|---|
|
||||
| "The org" | `orgRootID()`/`sameOrg()` (`org_scope.go`) | platform agent *becomes* the root; no `org_id` column |
|
||||
| Platform marker | `workspaces.kind` | `kind` only; never also `role`/`tier` |
|
||||
| Model/provider | `providers.yaml` runtime matrix | concierge switches via the same registry |
|
||||
| LLM billing | `ResolveLLMBillingModeDerived` | inherits the one resolver; no new path |
|
||||
| Config/secrets delivery | tenant Secrets Manager bundle (`seedWorkspaceConfigSecret`) | no new S3 prefix / second store |
|
||||
| Management API | OpenAPI spec | new endpoints authored there first; MCP/CLI/docs derive |
|
||||
| Gated actions | `internal/approvals/policy.go` | one map |
|
||||
| Platform-agent id | `uuidv5(org, "platform-agent")` | derived, never stored separately |
|
||||
|
||||
## 7. Security & blast radius
|
||||
|
||||
The concierge holds the org **admin token** (full tenant-root, self-minting) and is driven by
|
||||
end-user chat. Mitigations:
|
||||
- **Approval gate (§5.8)** must ship *with* the agent going user-facing, not after. Until then the
|
||||
agent is operator-only.
|
||||
- **Tenant isolation** is unchanged — every reach path still passes `sameOrg()`.
|
||||
- **MCP not in workspace images** (dedicated image, §5.7); the admin token lives only in the
|
||||
platform-agent container env on the tenant box.
|
||||
- **Token rotation:** the MCP reads env once at spawn → rotation = `docker restart
|
||||
molecule-platform-agent` (runbook item).
|
||||
- Future: a scoped-down org token (no delete/billing/member) — see §10.
|
||||
|
||||
## 8. Migration & rollout
|
||||
|
||||
Phase ordering is the rollout contract:
|
||||
- **Phase 0** (schema) ships and bakes before anything writes `kind`. Backward-compatible: every
|
||||
existing row defaults to `kind='workspace'`; the `CHECK` is added `NOT VALID` then validated.
|
||||
- **Phase 1 re-parenting backfill** is the one real watch-item. **Before** running it, audit whether
|
||||
any org-scoped table keys off the *root workspace id* (e.g. `org_api_tokens`, `org_plugin_allowlist`)
|
||||
versus the CP org UUID. If they reference the root workspace id, re-parenting changes "the root" and
|
||||
those refs must migrate too. The backfill is per-org, idempotent, and reversible.
|
||||
- New orgs get the platform agent from first boot; existing orgs backfill via `/admin/tenants
|
||||
redeploy` + a one-time re-parent migration.
|
||||
|
||||
## 9. Implementation phases
|
||||
|
||||
0. **Schema + model** (`molecule-core`): `kind` column + `approval_requests.consumed_at`; model field +
|
||||
constants; `Register` accepts/validates `kind` with invariants.
|
||||
1. **Platform-as-root + resolver** (`molecule-core` + CP): CP pre-seeds the platform row and creates
|
||||
teams under it; per-org re-parent backfill (after the §8 audit); `GET /registry/platform-agent`.
|
||||
2. **Config-driven two-MCP runtime** (runtime + claude-code template).
|
||||
3. **Image + tenant provisioning** (CP + image + `molecule-ci`): dedicated image; `start_platform_agent`
|
||||
in user-data + redeploy; config via the tenant Secrets Manager bundle; billing knob.
|
||||
4. **Approval gate** (`molecule-core`): policy map + `requireApproval` at destructive handlers; OpenAPI
|
||||
202 shape.
|
||||
5. **Dashboard concierge UX** (`molecule-app`): design-first, then build against the resolver.
|
||||
6. **Cleanup**: exclude the platform agent from billable counts; canvas visibility; rotation runbook.
|
||||
|
||||
## 10. Open questions
|
||||
|
||||
- **Scoped-down token.** Should the concierge hold a reduced-scope token (no delete/billing/member)
|
||||
instead of full admin + an approval gate? The token-scope system does not exist yet (`orgtoken`
|
||||
TODO). Recommendation: ship admin-token + approval gate now; add scope-down as a follow-up.
|
||||
- **Re-parenting vs. wrapper.** If product later wants a platform agent that is *not* the topological
|
||||
root, a `CanCommunicateWithKind` wrapper (guarded by `sameOrg`) is the alternative. Deferred —
|
||||
platform-as-root is lower-risk and needs zero access-control change.
|
||||
- **Canvas visibility** of the root concierge node (hide vs. show as the org anchor).
|
||||
|
||||
## 11. Verification (end-to-end on a staging tenant)
|
||||
|
||||
1. **Schema:** Phase-0 migrations applied; existing workspaces report `kind='workspace'`; `go test
|
||||
./...` + `-tags=integration` green.
|
||||
2. **Provision:** redeploy a staging tenant; `docker ps` shows `molecule-platform-agent` healthy; its
|
||||
logs show a successful `/registry/register`.
|
||||
3. **Identity:** the platform row is `kind='platform'`, `parent_id IS NULL`; the former root now has
|
||||
`parent_id = <platform id>`; `GET /registry/platform-agent` returns it.
|
||||
4. **Reach:** chat the platform agent → it `list_workspaces` then `create_workspace` via the platform
|
||||
MCP and reports back via `send_message_to_user`.
|
||||
5. **Isolation:** it reaches every workspace in its org and **cannot** reach another tenant's
|
||||
workspace.
|
||||
6. **Approval gate:** `delete_workspace` → HTTP 202 pending + approval event; decide-approve →
|
||||
completes; a second delete with the same approval is rejected (consumed).
|
||||
7. Drive a real concierge flow ("spin up a PM + engineer to build X") and watch the delegation/activity
|
||||
ledger.
|
||||
|
||||
---
|
||||
|
||||
*Derived from a read-only multi-agent source audit of `molecule-core`, `molecule-controlplane`,
|
||||
`molecule-ai-workspace-runtime`, `molecule-ai-workspace-template-claude-code`, and
|
||||
`molecule-mcp-server`. No secret values recorded.*
|
||||
@@ -1050,13 +1050,12 @@ def test_reap_continues_on_per_sha_apierror(sr_module, monkeypatch, capsys):
|
||||
|
||||
|
||||
def test_main_soft_skips_when_commit_listing_times_out(sr_module, monkeypatch, capsys):
|
||||
"""A transient outage while listing recent commits fails the tick visibly.
|
||||
"""A transient outage while listing recent commits should not paint main red.
|
||||
|
||||
Per-SHA status read failures are already isolated inside `reap_branch`.
|
||||
The real 2026-05-14 failure was earlier: `/commits?sha=main&limit=30`
|
||||
timed out after all retries, aborting the tick. The next 5-minute tick can
|
||||
retry safely, but the tick itself must be observable as red (exit 1 + error
|
||||
annotation) so the cron bot alerts on persistent infra issues.
|
||||
retry safely, so `main()` should emit an observable warning and return 0.
|
||||
"""
|
||||
|
||||
monkeypatch.setattr(sr_module, "scan_workflows", lambda _: {"workflow-without-push": False})
|
||||
@@ -1069,9 +1068,9 @@ def test_main_soft_skips_when_commit_listing_times_out(sr_module, monkeypatch, c
|
||||
monkeypatch.setattr(sr_module, "list_recent_commit_shas", fake_list_recent_commit_shas)
|
||||
monkeypatch.setattr(sys, "argv", ["status-reaper.py"])
|
||||
|
||||
assert sr_module.main() == 1
|
||||
assert sr_module.main() == 0
|
||||
captured = capsys.readouterr()
|
||||
assert "::error::status-reaper cannot run" in captured.out
|
||||
assert "::warning::status-reaper skipped this tick" in captured.out
|
||||
assert '"skipped": true' in captured.out
|
||||
assert '"skip_reason": "commit-list-api-error"' in captured.out
|
||||
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
// Package approvals holds the single source of truth for which destructive
|
||||
// org operations require a human approval before they execute.
|
||||
//
|
||||
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
//
|
||||
// The org-level platform agent is driven by end-user chat and holds an org-admin
|
||||
// token, so destructive/irreversible operations it can trigger are gated: the
|
||||
// handler creates a pending approval and returns it instead of executing, and a
|
||||
// human decides via the existing approvals subsystem. Keeping the gated-action
|
||||
// list in ONE map makes the blast-radius boundary auditable in a single place —
|
||||
// a handler not listed here behaves exactly as before.
|
||||
package approvals
|
||||
|
||||
// Action is the canonical identifier of a gated destructive operation. The same
|
||||
// string is stored in approval_requests.action so the gate can match a pending/
|
||||
// approved request to the operation being retried.
|
||||
type Action string
|
||||
|
||||
const (
|
||||
ActionDeleteWorkspace Action = "delete_workspace"
|
||||
ActionDeprovision Action = "deprovision_workspace"
|
||||
ActionSecretWrite Action = "secret_write"
|
||||
ActionOrgTokenMint Action = "org_token_mint"
|
||||
)
|
||||
|
||||
// gated is the set of actions that require a human approval. Add an entry here
|
||||
// (and gate the corresponding handler with requireApproval) to expand the
|
||||
// boundary; remove one to drop a gate. This is the only place the policy lives.
|
||||
var gated = map[Action]bool{
|
||||
ActionDeleteWorkspace: true,
|
||||
ActionDeprovision: true,
|
||||
ActionSecretWrite: true,
|
||||
ActionOrgTokenMint: true,
|
||||
}
|
||||
|
||||
// IsGated reports whether action requires a human approval before executing.
|
||||
func IsGated(action Action) bool {
|
||||
return gated[action]
|
||||
}
|
||||
@@ -63,31 +63,6 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionSearch_DBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("WITH session_items AS").
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-123/session-search?q=test", bytes.NewBufferString(""))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-123"}}
|
||||
|
||||
handler.SessionSearch(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB error, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Activity List source filter ----------
|
||||
|
||||
func TestActivityList_SourceCanvas(t *testing.T) {
|
||||
|
||||
@@ -1,153 +0,0 @@
|
||||
package handlers
|
||||
|
||||
// approval_gate.go — server-side gate for destructive org operations.
|
||||
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
//
|
||||
// requireApproval is the choke point a destructive handler calls before
|
||||
// executing. It is the trust boundary: the platform-management MCP is a CLIENT
|
||||
// of these handlers, so enforcing here (not in the MCP) means anything holding
|
||||
// an org-admin token still goes through the gate. The flow:
|
||||
//
|
||||
// - if a matching APPROVED + unconsumed approval exists, consume it (single-
|
||||
// use) and let the operation proceed;
|
||||
// - otherwise create (or reuse) a PENDING approval, broadcast it to the canvas
|
||||
// (and escalate to the parent if any), and the handler returns HTTP 202 so a
|
||||
// human can decide. The agent retries after approval and the gate passes.
|
||||
//
|
||||
// Matching is by (workspace_id, action, request_hash) where request_hash is a
|
||||
// stable digest of the operation + its context, so a retried op reuses its own
|
||||
// request instead of flooding the table, and an approval for "delete ws A"
|
||||
// cannot be replayed to "delete ws B".
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// approvalRequestHash is a stable digest of the gated operation. Go's
|
||||
// json.Marshal sorts map keys, so the same context always hashes the same.
|
||||
func approvalRequestHash(workspaceID, action string, contextMap map[string]interface{}) string {
|
||||
cj, err := json.Marshal(contextMap)
|
||||
if err != nil || cj == nil {
|
||||
cj = []byte("{}")
|
||||
}
|
||||
sum := sha256.Sum256([]byte(workspaceID + "\x00" + action + "\x00" + string(cj)))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
// requireApproval returns (approved=true, consumedID) when a matching approval
|
||||
// exists and was just consumed; otherwise it creates/reuses a pending approval
|
||||
// and returns (false, pendingID). A non-nil error is a server error.
|
||||
func requireApproval(ctx context.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) (bool, string, error) {
|
||||
hash := approvalRequestHash(workspaceID, string(action), contextMap)
|
||||
|
||||
// 1. Atomically consume an approved + unconsumed request, if one exists.
|
||||
// The conditional UPDATE ... RETURNING makes consumption race-safe: two
|
||||
// concurrent destructive calls cannot both consume the same approval.
|
||||
var consumedID string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
UPDATE approval_requests SET consumed_at = now()
|
||||
WHERE id = (
|
||||
SELECT id FROM approval_requests
|
||||
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3
|
||||
AND status = 'approved' AND consumed_at IS NULL
|
||||
ORDER BY decided_at DESC NULLS LAST
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`, workspaceID, string(action), hash).Scan(&consumedID)
|
||||
if err == nil {
|
||||
return true, consumedID, nil
|
||||
}
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
return false, "", fmt.Errorf("consume approval: %w", err)
|
||||
}
|
||||
|
||||
// 2. No usable approval — create a pending one, or reuse an existing pending
|
||||
// request for the same operation so retries don't flood the table.
|
||||
cj, mErr := json.Marshal(contextMap)
|
||||
if mErr != nil || cj == nil {
|
||||
cj = []byte("{}")
|
||||
}
|
||||
var approvalID string
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
WITH existing AS (
|
||||
SELECT id FROM approval_requests
|
||||
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3 AND status = 'pending'
|
||||
LIMIT 1
|
||||
), ins AS (
|
||||
INSERT INTO approval_requests (workspace_id, action, reason, context, request_hash)
|
||||
SELECT $1, $2, $4, $5::jsonb, $3
|
||||
WHERE NOT EXISTS (SELECT 1 FROM existing)
|
||||
RETURNING id
|
||||
)
|
||||
SELECT id FROM ins UNION ALL SELECT id FROM existing LIMIT 1
|
||||
`, workspaceID, string(action), hash, reason, string(cj)).Scan(&approvalID)
|
||||
if err != nil {
|
||||
return false, "", fmt.Errorf("create approval: %w", err)
|
||||
}
|
||||
|
||||
// Broadcast to the canvas (the user-facing signal). For a platform agent the
|
||||
// parent_id is NULL, so the requested-event on its own workspace IS the user
|
||||
// prompt; ordinary workspaces also escalate to their parent.
|
||||
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
}); bErr != nil {
|
||||
log.Printf("approval_gate: broadcast requested failed (ws=%s): %v", workspaceID, bErr)
|
||||
}
|
||||
var parentID *string
|
||||
if pErr := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); pErr != nil {
|
||||
log.Printf("approval_gate: parent lookup failed (ws=%s): %v", workspaceID, pErr)
|
||||
}
|
||||
if parentID != nil {
|
||||
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
}); bErr != nil {
|
||||
log.Printf("approval_gate: broadcast escalated failed (ws=%s): %v", workspaceID, bErr)
|
||||
}
|
||||
}
|
||||
return false, approvalID, nil
|
||||
}
|
||||
|
||||
// gateDestructive runs requireApproval for a gated action and, when approval is
|
||||
// still pending, writes the 202 response and returns false (caller must stop).
|
||||
// Returns true when the caller may proceed (action consumed an approval).
|
||||
func gateDestructive(c *gin.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) bool {
|
||||
if !approvals.IsGated(action) {
|
||||
return true
|
||||
}
|
||||
approved, approvalID, err := requireApproval(c.Request.Context(), b, workspaceID, action, reason, contextMap)
|
||||
if err != nil {
|
||||
log.Printf("gateDestructive: %v (ws=%s action=%s)", err, workspaceID, action)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "approval gate failed"})
|
||||
return false
|
||||
}
|
||||
if !approved {
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"status": "pending_approval",
|
||||
"approval_id": approvalID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
})
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -1,137 +0,0 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// approval_gate_integration_test.go — REAL Postgres gate for requireApproval.
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_RequireApproval -v
|
||||
//
|
||||
// Why this is NOT a sqlmock test
|
||||
// ------------------------------
|
||||
// The whole gate is about row state across calls: a pending request is created
|
||||
// once and reused (dedup), an approval is consumed exactly once (single-use via
|
||||
// the conditional UPDATE ... RETURNING), and a different operation context hashes
|
||||
// to a different request. sqlmock returns whatever the stub says; only a real
|
||||
// Postgres proves the consume-once semantics and the partial-index lookup.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func TestIntegration_RequireApproval_GateCycle(t *testing.T) {
|
||||
url := requireIntegrationDBURL(t)
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
|
||||
// requireApproval + the broadcaster's structure_events write use the db.DB
|
||||
// global; point it at the integration DB and restore afterwards.
|
||||
prev := db.DB
|
||||
db.DB = conn
|
||||
t.Cleanup(func() { db.DB = prev })
|
||||
setupTestRedis(t) // broadcaster publishes to db.RDB; miniredis backs it
|
||||
|
||||
ctx := context.Background()
|
||||
b := newTestBroadcaster()
|
||||
|
||||
wsID := uuid.New().String()
|
||||
t.Cleanup(func() {
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM approval_requests WHERE workspace_id = $1`, wsID)
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE id = $1`, wsID)
|
||||
})
|
||||
// A root workspace (parent_id NULL) — like the platform agent, it has no
|
||||
// parent, so the gate's escalation target is the user/canvas. (This branch
|
||||
// is off main and has no kind column; the gate is kind-agnostic.)
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, status, runtime, parent_id)
|
||||
VALUES ($1, 'Org Concierge', 0, 'online', 'claude-code', NULL)`, wsID); err != nil {
|
||||
t.Fatalf("seed root workspace: %v", err)
|
||||
}
|
||||
|
||||
action := approvals.ActionDeleteWorkspace
|
||||
ctxA := map[string]interface{}{"target": "ws-A"}
|
||||
|
||||
// 1. First call → no approval yet → pending created.
|
||||
ok, id1, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 1: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 1: approved=true, want false (no approval exists yet)")
|
||||
}
|
||||
|
||||
// 2. Same operation again → must REUSE the same pending row (dedup), not flood.
|
||||
ok, id2, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 2: %v", err)
|
||||
}
|
||||
if ok || id2 != id1 {
|
||||
t.Fatalf("call 2: ok=%v id2=%s, want false and id2==id1(%s) (dedup)", ok, id2, id1)
|
||||
}
|
||||
var nPending int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT count(*) FROM approval_requests WHERE workspace_id=$1 AND status='pending'`, wsID).Scan(&nPending); err != nil {
|
||||
t.Fatalf("count pending: %v", err)
|
||||
}
|
||||
if nPending != 1 {
|
||||
t.Fatalf("pending rows = %d, want 1 (dedup must not flood)", nPending)
|
||||
}
|
||||
|
||||
// 3. A human approves it (simulating the Decide handler).
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE approval_requests SET status='approved', decided_by='human', decided_at=now() WHERE id=$1`, id1); err != nil {
|
||||
t.Fatalf("approve: %v", err)
|
||||
}
|
||||
|
||||
// 4. Now the gate consumes the approval and lets the op proceed.
|
||||
ok, consumedID, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 4: %v", err)
|
||||
}
|
||||
if !ok || consumedID != id1 {
|
||||
t.Fatalf("call 4: ok=%v consumedID=%s, want true and id1(%s)", ok, consumedID, id1)
|
||||
}
|
||||
|
||||
// 5. Single-use: the SAME approval cannot be replayed — the next call is
|
||||
// pending again (a fresh request), not approved.
|
||||
ok, id5, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 5: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 5: approved=true — a consumed approval was replayed")
|
||||
}
|
||||
if id5 == id1 {
|
||||
t.Fatal("call 5: reused the consumed request id; want a new pending request")
|
||||
}
|
||||
|
||||
// 6. Context isolation: an approval for ws-A must not authorize ws-B.
|
||||
// Approve the ws-A request, then a ws-B op must still be pending.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE approval_requests SET status='approved', decided_at=now() WHERE id=$1`, id5); err != nil {
|
||||
t.Fatalf("approve id5: %v", err)
|
||||
}
|
||||
ok, _, err = requireApproval(ctx, b, wsID, action, "delete ws-B", map[string]interface{}{"target": "ws-B"})
|
||||
if err != nil {
|
||||
t.Fatalf("call 6: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 6: ws-B proceeded on a ws-A approval — context isolation broken")
|
||||
}
|
||||
}
|
||||
@@ -1,46 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestGateDestructive_NonGatedPassesThrough verifies a non-gated action skips
|
||||
// the gate entirely (no DB access, no 202) so handlers whose action isn't in the
|
||||
// policy map behave exactly as before.
|
||||
func TestGateDestructive_NonGatedPassesThrough(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/x", nil)
|
||||
|
||||
proceed := gateDestructive(c, newTestBroadcaster(), "ws-1",
|
||||
approvals.Action("not_a_gated_action"), "noop", nil)
|
||||
|
||||
if !proceed {
|
||||
t.Fatalf("non-gated action must proceed, got proceed=false (status %d)", w.Code)
|
||||
}
|
||||
if w.Code != http.StatusOK { // CreateTestContext default; nothing written
|
||||
t.Errorf("non-gated action wrote a response (status %d), want none", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestApprovalRequestHash_StableAndContextSensitive pins the two properties the
|
||||
// gate relies on: the same operation hashes identically across calls, and a
|
||||
// different context yields a different hash (so an approval can't be replayed
|
||||
// onto a different target).
|
||||
func TestApprovalRequestHash_StableAndContextSensitive(t *testing.T) {
|
||||
a := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "A", "n": 1})
|
||||
aAgain := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"n": 1, "target": "A"})
|
||||
b := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "B", "n": 1})
|
||||
if a != aAgain {
|
||||
t.Errorf("hash not stable across equal contexts: %s vs %s", a, aAgain)
|
||||
}
|
||||
if a == b {
|
||||
t.Errorf("hash not context-sensitive: target A and B collided (%s)", a)
|
||||
}
|
||||
}
|
||||
@@ -602,33 +602,6 @@ func TestDelegationRecord_RejectsInvalidUUID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationRecord_DBInsertFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
h := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnError(fmt.Errorf("connection refused"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"}}
|
||||
body := `{"target_id":"550e8400-e29b-41d4-a716-446655440001","task":"hello","delegation_id":"del-xyz"}`
|
||||
c.Request = httptest.NewRequest("POST", "/delegations/record", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.Record(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB insert failure, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationUpdateStatus_CompletedInsertsResultRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@@ -337,7 +337,7 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) {
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push", "").
|
||||
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// DB returns provisioner URL (127.0.0.1) — should take precedence over agent-reported URL
|
||||
|
||||
@@ -180,7 +180,7 @@ func TestRegisterHandler(t *testing.T) {
|
||||
|
||||
// Expect the upsert INSERT ... ON CONFLICT
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push", "").
|
||||
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect the SELECT url query (for cache URL logic)
|
||||
|
||||
@@ -1,122 +0,0 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// kind_platform_root_integration_test.go — REAL Postgres gate for the
|
||||
// platform-agent participant kind (RFC docs/design/rfc-platform-agent.md).
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_PlatformKind -v
|
||||
//
|
||||
// CI: piggybacks on the handlers-postgres-integration workflow (path filter
|
||||
// includes workspace-server/internal/handlers/** and migrations/**).
|
||||
//
|
||||
// Why this is NOT a sqlmock test
|
||||
// ------------------------------
|
||||
// The invariant "a platform agent must be the org root (parent_id IS NULL),
|
||||
// which structurally also means at most one platform agent per org" is enforced
|
||||
// by the workspaces_platform_root_check CHECK constraint in migration
|
||||
// 20260606000000_workspaces_kind. sqlmock cannot execute DDL or evaluate a CHECK
|
||||
// constraint, so only a real Postgres can prove the constraint actually rejects
|
||||
// a non-root platform agent and accepts a root one. The Register handler's
|
||||
// isPlatformRootViolation()/409 path depends on this constraint firing.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func integrationDB_PlatformKind(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
url := requireIntegrationDBURL(t)
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
return conn
|
||||
}
|
||||
|
||||
// TestIntegration_PlatformKind_RootAllowed_NonRootRejected proves the three
|
||||
// guarantees of the kind column against a real Postgres:
|
||||
//
|
||||
// 1. a fresh workspace defaults to kind='workspace';
|
||||
// 2. a root row (parent_id IS NULL) may be kind='platform';
|
||||
// 3. a non-root row (parent_id set) may NOT be kind='platform' — the
|
||||
// workspaces_platform_root_check constraint rejects it (23514).
|
||||
func TestIntegration_PlatformKind_RootAllowed_NonRootRejected(t *testing.T) {
|
||||
conn := integrationDB_PlatformKind(t)
|
||||
ctx := context.Background()
|
||||
|
||||
prefix := fmt.Sprintf("itest-kind-%s", uuid.New().String()[:8])
|
||||
cleanup := func() {
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`DELETE FROM workspaces WHERE name LIKE $1`, prefix+"%"); err != nil {
|
||||
t.Logf("cleanup (non-fatal): %v", err)
|
||||
}
|
||||
}
|
||||
t.Cleanup(cleanup)
|
||||
cleanup() // pre-test hygiene in the shared integration DB
|
||||
|
||||
rootID := uuid.New().String()
|
||||
childID := uuid.New().String()
|
||||
|
||||
// 1. Default kind is 'workspace' when the column is omitted on INSERT.
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status, parent_id)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'online', NULL)
|
||||
`, rootID, prefix+"-root"); err != nil {
|
||||
t.Fatalf("seed root: %v", err)
|
||||
}
|
||||
var gotKind string
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT kind FROM workspaces WHERE id = $1`, rootID).Scan(&gotKind); err != nil {
|
||||
t.Fatalf("read kind: %v", err)
|
||||
}
|
||||
if gotKind != "workspace" {
|
||||
t.Fatalf("default kind = %q, want \"workspace\"", gotKind)
|
||||
}
|
||||
|
||||
// 2. The root row may become a platform agent.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET kind = 'platform' WHERE id = $1`, rootID); err != nil {
|
||||
t.Fatalf("promote root to platform: unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// A child of the platform root (an ordinary workspace) inserts fine.
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status, parent_id)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'online', $3)
|
||||
`, childID, prefix+"-child", rootID); err != nil {
|
||||
t.Fatalf("seed child: %v", err)
|
||||
}
|
||||
|
||||
// 3. The non-root child may NOT be a platform agent — the CHECK rejects it.
|
||||
_, err := conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET kind = 'platform' WHERE id = $1`, childID)
|
||||
if err == nil {
|
||||
t.Fatalf("non-root child accepted kind='platform' — constraint did not fire")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "workspaces_platform_root_check") {
|
||||
t.Fatalf("non-root platform rejection wanted workspaces_platform_root_check, got: %v", err)
|
||||
}
|
||||
|
||||
// And the unknown-kind value is rejected by workspaces_kind_check.
|
||||
_, err = conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET kind = 'bogus' WHERE id = $1`, rootID)
|
||||
if err == nil || !strings.Contains(err.Error(), "workspaces_kind_check") {
|
||||
t.Fatalf("unknown kind wanted workspaces_kind_check rejection, got: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -164,20 +164,6 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
|
||||
// errPlatformNotRoot is the client-facing message when a register call tried to
|
||||
// mark a non-root workspace as a platform agent.
|
||||
const errPlatformNotRoot = "a platform agent must be the org root (parent_id must be null)"
|
||||
|
||||
// isPlatformRootViolation reports whether err is the DB rejecting a register
|
||||
// that tried to mark a non-root workspace as a platform agent (the
|
||||
// workspaces_platform_root_check CHECK constraint). The handler maps it to a
|
||||
// friendly HTTP 409 instead of a raw 500. The invariant — platform == org root,
|
||||
// which structurally also guarantees one platform agent per org — is enforced
|
||||
// race-proof at the DB level; this is just the friendly surface.
|
||||
func isPlatformRootViolation(err error) bool {
|
||||
return err != nil && strings.Contains(err.Error(), "workspaces_platform_root_check")
|
||||
}
|
||||
|
||||
// Returns a non-nil error suitable for including in a 400 Bad Request response.
|
||||
func validateAgentURL(rawURL string) error {
|
||||
if rawURL == "" {
|
||||
@@ -291,14 +277,6 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Validate explicit kind if the agent declared one; empty is allowed and
|
||||
// resolves to the row's existing value (or "workspace" default) in
|
||||
// resolveKind below. Only the platform-agent container declares 'platform'.
|
||||
if payload.Kind != "" && !models.IsValidKind(payload.Kind) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'workspace' or 'platform'"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// C18: prevent workspace URL hijacking on re-registration.
|
||||
@@ -412,15 +390,9 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
// the row. Without this guard, bulk deletes left tier-3 stragglers because
|
||||
// the last pre-teardown heartbeat flipped status back to 'online' after
|
||||
// Delete's UPDATE.
|
||||
// kind ($6) is the raw payload value (validated above; "" = unspecified).
|
||||
// COALESCE(NULLIF($6,''), …) means: an explicit kind wins; an unspecified
|
||||
// kind defaults to 'workspace' for a NEW row and KEEPS the existing kind on
|
||||
// re-register (so a platform agent re-registering without kind is never
|
||||
// downgraded). A non-root row asking for 'platform' is rejected by the
|
||||
// workspaces_platform_root_check constraint → friendly 409 below.
|
||||
_, err = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode, kind)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5, COALESCE(NULLIF($6, ''), 'workspace'))
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
url = CASE
|
||||
WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url
|
||||
@@ -430,15 +402,10 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
status = 'online',
|
||||
last_heartbeat_at = now(),
|
||||
delivery_mode = EXCLUDED.delivery_mode,
|
||||
kind = COALESCE(NULLIF($6, ''), workspaces.kind),
|
||||
updated_at = now()
|
||||
WHERE workspaces.status IS DISTINCT FROM 'removed'
|
||||
`, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert, payload.Kind)
|
||||
`, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert)
|
||||
if err != nil {
|
||||
if isPlatformRootViolation(err) {
|
||||
c.JSON(http.StatusConflict, gin.H{"error": errPlatformNotRoot})
|
||||
return
|
||||
}
|
||||
log.Printf("Registry register error: %v (id=%s)", err, payload.ID)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"})
|
||||
return
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestRegister_DBError(t *testing.T) {
|
||||
|
||||
// DB insert fails
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push", "").
|
||||
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@@ -647,7 +647,7 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) {
|
||||
// This regex-ish match requires the guard. If the handler ever drops
|
||||
// the clause the test fails because the emitted SQL won't match.
|
||||
mock.ExpectExec("ON CONFLICT.*WHERE workspaces.status IS DISTINCT FROM 'removed'").
|
||||
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push", "").
|
||||
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows affected = correctly guarded
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs("ws-resurrect").
|
||||
@@ -917,7 +917,7 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) {
|
||||
|
||||
// Workspace upsert proceeds normally.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push", "").
|
||||
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
@@ -1228,7 +1228,7 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) {
|
||||
|
||||
// DB upsert fails with a descriptive internal error.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push", "").
|
||||
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@@ -1476,7 +1476,7 @@ func TestRegister_PollMode_AcceptsEmptyURL(t *testing.T) {
|
||||
|
||||
// Upsert MUST run with empty URL (sql.NullString) and delivery_mode=poll.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll", "").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// SELECT url for cache: returns NULL/empty for poll-mode rows. The
|
||||
@@ -1591,89 +1591,6 @@ func TestRegister_InvalidDeliveryMode(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_InvalidKind rejects payloads that declare an unrecognised kind —
|
||||
// only 'workspace' and 'platform' are valid. Mirrors the delivery_mode guard;
|
||||
// the rejection happens before any DB access.
|
||||
func TestRegister_InvalidKind(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"ws-x","url":"http://localhost:8000","agent_card":{"name":"a"},"kind":"bogus"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("invalid kind: expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "kind") {
|
||||
t.Errorf("expected error body to mention kind, got: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_PlatformKind_PersistsKind verifies that a workspace registering
|
||||
// with kind="platform" has that value written through the upsert (the platform
|
||||
// agent self-registers as the org root). The platform==root invariant itself is
|
||||
// enforced by the workspaces_platform_root_check DB constraint and exercised by
|
||||
// the integration test, which sqlmock cannot enforce.
|
||||
func TestRegister_PlatformKind_PersistsKind(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
const wsID = "ws-platform-agent"
|
||||
|
||||
// Bootstrap path — no live tokens.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// delivery_mode="push" is set explicitly, so resolveDeliveryMode
|
||||
// short-circuits (no SELECT delivery_mode lookup). The upsert MUST carry
|
||||
// kind="platform" as the 6th arg.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, "http://localhost:9100", `{"name":"concierge"}`, "push", "platform").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("http://localhost:9100"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Token issuance — first-register path.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"`+wsID+`","url":"http://localhost:9100","delivery_mode":"push","kind":"platform","agent_card":{"name":"concierge"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("platform register: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_PollMode_PreservesExistingValue: when the row already
|
||||
// has delivery_mode=poll and the payload doesn't set it, the resolved
|
||||
// mode should be poll — i.e. "absent payload mode" must NOT silently
|
||||
@@ -1699,7 +1616,7 @@ func TestRegister_PollMode_PreservesExistingValue(t *testing.T) {
|
||||
// Upsert carries the resolved poll mode forward — even though
|
||||
// payload didn't restate it. URL still empty (poll-mode shape).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll", "").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@@ -1768,7 +1685,7 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
|
||||
AddRow(sql.NullString{}, "external"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll", "").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@@ -1827,7 +1744,7 @@ func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) {
|
||||
AddRow(sql.NullString{}, "kimi-cli"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll", "").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@@ -1887,7 +1804,7 @@ func TestRegister_NonExternalRuntime_StillDefaultsToPush(t *testing.T) {
|
||||
AddRow(sql.NullString{}, "claude-code"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, "http://localhost:8000", `{"name":"a"}`, "push", "").
|
||||
WithArgs(wsID, wsID, "http://localhost:8000", `{"name":"a"}`, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
|
||||
@@ -13,16 +13,11 @@ import (
|
||||
const DefaultMaxConcurrentTasks = 1
|
||||
|
||||
type Workspace struct {
|
||||
ID string `json:"id" db:"id"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Role sql.NullString `json:"role" db:"role"`
|
||||
Tier int `json:"tier" db:"tier"`
|
||||
Status string `json:"status" db:"status"`
|
||||
// Kind: "workspace" (default) or "platform". A "platform" workspace is the
|
||||
// org-level concierge (the platform agent) that sits at the org root and is
|
||||
// the user's default A2A target. See migration
|
||||
// 20260606000000_workspaces_kind + RFC docs/design/rfc-platform-agent.md.
|
||||
Kind string `json:"kind" db:"kind"`
|
||||
ID string `json:"id" db:"id"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Role sql.NullString `json:"role" db:"role"`
|
||||
Tier int `json:"tier" db:"tier"`
|
||||
Status string `json:"status" db:"status"`
|
||||
SourceBundleID sql.NullString `json:"source_bundle_id" db:"source_bundle_id"`
|
||||
AgentCard json.RawMessage `json:"agent_card" db:"agent_card"`
|
||||
URL sql.NullString `json:"url" db:"url"`
|
||||
@@ -68,21 +63,6 @@ func IsValidDeliveryMode(s string) bool {
|
||||
return s == DeliveryModePush || s == DeliveryModePoll
|
||||
}
|
||||
|
||||
// Workspace kind constants. Matches the CHECK constraint in migration
|
||||
// 20260606000000_workspaces_kind. KindPlatform marks the org-level concierge
|
||||
// (the platform agent) which sits at the org root; see
|
||||
// docs/design/rfc-platform-agent.md.
|
||||
const (
|
||||
KindWorkspace = "workspace"
|
||||
KindPlatform = "platform"
|
||||
)
|
||||
|
||||
// IsValidKind reports whether s is a recognised workspace kind. Empty string is
|
||||
// NOT valid here — callers resolve the default (KindWorkspace) before calling.
|
||||
func IsValidKind(s string) bool {
|
||||
return s == KindWorkspace || s == KindPlatform
|
||||
}
|
||||
|
||||
type RegisterPayload struct {
|
||||
ID string `json:"id" binding:"required"`
|
||||
// URL is required for push-mode workspaces; optional / unused for
|
||||
@@ -96,12 +76,6 @@ type RegisterPayload struct {
|
||||
// value on the workspace row, or default to push for new rows".
|
||||
// When set, must be one of DeliveryModePush / DeliveryModePoll.
|
||||
DeliveryMode string `json:"delivery_mode,omitempty"`
|
||||
// Kind is optional. Empty string means "keep the existing value on the
|
||||
// workspace row, or default to KindWorkspace for new rows". When set, must
|
||||
// be one of KindWorkspace / KindPlatform. KindPlatform additionally requires
|
||||
// the row to be its own org root (parent_id IS NULL) and to be the only
|
||||
// platform agent in the org — enforced by the Register handler.
|
||||
Kind string `json:"kind,omitempty"`
|
||||
}
|
||||
|
||||
type HeartbeatPayload struct {
|
||||
|
||||
@@ -34,35 +34,6 @@ func TestIsValidDeliveryMode_Invalid(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== IsValidKind ====================
|
||||
|
||||
func TestIsValidKind_Valid(t *testing.T) {
|
||||
for _, k := range []string{KindWorkspace, KindPlatform} {
|
||||
if !IsValidKind(k) {
|
||||
t.Errorf("IsValidKind(%q) = false, want true", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsValidKind_Invalid(t *testing.T) {
|
||||
cases := []struct {
|
||||
val string
|
||||
want bool
|
||||
}{
|
||||
{"", false}, // empty is not valid — callers resolve the default
|
||||
{"platforms", false}, // typo
|
||||
{"Platform", false}, // case-sensitive
|
||||
{"platform ", false}, // trailing space
|
||||
{"root", false}, // not a kind
|
||||
{"user", false}, // the user is implicit, not a workspace kind
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := IsValidKind(tc.val); got != tc.want {
|
||||
t.Errorf("IsValidKind(%q) = %v, want %v", tc.val, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== WorkspaceStatus ====================
|
||||
|
||||
func TestWorkspaceStatus_String(t *testing.T) {
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
-- Reverse the participant-kind discriminator.
|
||||
-- Non-destructive: dropping the column makes every workspace an ordinary
|
||||
-- workspace again (the platform agent loses its marker but its row survives).
|
||||
DROP INDEX IF EXISTS idx_workspaces_kind;
|
||||
ALTER TABLE workspaces DROP CONSTRAINT IF EXISTS workspaces_platform_root_check;
|
||||
ALTER TABLE workspaces DROP CONSTRAINT IF EXISTS workspaces_kind_check;
|
||||
ALTER TABLE workspaces DROP COLUMN IF EXISTS kind;
|
||||
@@ -1,45 +0,0 @@
|
||||
-- Participant-kind discriminator for the org-level platform agent.
|
||||
-- (RFC: docs/design/rfc-platform-agent.md)
|
||||
--
|
||||
-- 'workspace' (default) = an ordinary workspace / agent.
|
||||
-- 'platform' = the org-level concierge (the "platform agent"). It is
|
||||
-- the single org root (parent_id IS NULL) and the user's
|
||||
-- default A2A chat target. Exactly one per org.
|
||||
--
|
||||
-- There is no org_id column — an "org" is the parent_id-chain root resolved by
|
||||
-- org_scope.go (orgRootID/sameOrg). "platform == org root" and "one platform
|
||||
-- agent per org" are therefore enforced in the Register/create handlers, not in
|
||||
-- pure SQL. This column is only the discriminator (default-target / billing
|
||||
-- exclusion / UX), defined once here and mirrored by the Go constants
|
||||
-- models.KindWorkspace / models.KindPlatform.
|
||||
--
|
||||
-- Backward-compatible: every existing row defaults to 'workspace'. The CHECK is
|
||||
-- added NOT VALID then validated so the ALTER can never fail on legacy data.
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS kind TEXT NOT NULL DEFAULT 'workspace';
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'workspaces_kind_check') THEN
|
||||
ALTER TABLE workspaces
|
||||
ADD CONSTRAINT workspaces_kind_check CHECK (kind IN ('workspace', 'platform')) NOT VALID;
|
||||
ALTER TABLE workspaces VALIDATE CONSTRAINT workspaces_kind_check;
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
-- platform == org root, enforced at the DB level (race-proof). A platform agent
|
||||
-- MUST have parent_id IS NULL. Because an org is the subtree under a single
|
||||
-- parent_id IS NULL root (org_scope.go) and only a root may be 'platform', this
|
||||
-- also structurally guarantees at most ONE platform agent per org. The handler
|
||||
-- additionally pre-checks this to return a friendly 409 instead of a raw 23514.
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'workspaces_platform_root_check') THEN
|
||||
ALTER TABLE workspaces
|
||||
ADD CONSTRAINT workspaces_platform_root_check
|
||||
CHECK (kind <> 'platform' OR parent_id IS NULL) NOT VALID;
|
||||
ALTER TABLE workspaces VALIDATE CONSTRAINT workspaces_platform_root_check;
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workspaces_kind ON workspaces(kind);
|
||||
@@ -1,5 +0,0 @@
|
||||
-- Reverse the approval-gate single-use/dedup columns.
|
||||
DROP INDEX IF EXISTS approval_requests_gate_idx;
|
||||
ALTER TABLE approval_requests
|
||||
DROP COLUMN IF EXISTS request_hash,
|
||||
DROP COLUMN IF EXISTS consumed_at;
|
||||
@@ -1,18 +0,0 @@
|
||||
-- Single-use + dedup support for the destructive-op approval gate.
|
||||
-- (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
--
|
||||
-- consumed_at: an approval is single-use. Once a destructive op consumes an
|
||||
-- approved request, consumed_at is stamped so the same approval can't be
|
||||
-- replayed for a second destructive call.
|
||||
-- request_hash: a stable hash of (workspace_id, action, context) so a repeated
|
||||
-- destructive attempt matches its own pending/approved request instead of
|
||||
-- flooding the table with duplicates.
|
||||
ALTER TABLE approval_requests
|
||||
ADD COLUMN IF NOT EXISTS consumed_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS request_hash TEXT;
|
||||
|
||||
-- Hot path: the gate looks up an approved + unconsumed row matching
|
||||
-- (workspace_id, action, request_hash). Partial index keeps that O(log live).
|
||||
CREATE INDEX IF NOT EXISTS approval_requests_gate_idx
|
||||
ON approval_requests (workspace_id, action, request_hash)
|
||||
WHERE status = 'approved' AND consumed_at IS NULL;
|
||||
Reference in New Issue
Block a user