Compare commits
77 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f55402459 | |||
| b87e20a559 | |||
| beec3e52b4 | |||
| 9a21ccd96f | |||
| 07e3cefd67 | |||
| 1eaad170df | |||
| 1187d072da | |||
| 6d2b49941f | |||
| b804d16b47 | |||
| e0b6939e11 | |||
| 45dcc665d3 | |||
| a1408cfdd4 | |||
| c90b44fc47 | |||
| a89e2680c9 | |||
| 173881e67a | |||
| bde5421766 | |||
| 2e068c7586 | |||
| a380218234 | |||
| 578b145312 | |||
| a77b6850e2 | |||
| 2f9b5b6704 | |||
| 86df02c38f | |||
| db39d519dc | |||
| 0b771d5770 | |||
| 8a63d16f8c | |||
| 63c25d4c3f | |||
| 116697c576 | |||
| d1c6fce937 | |||
| 0e87fde0a3 | |||
| d5e254f431 | |||
| 3bf1ab18e1 | |||
| 51c2d4d402 | |||
| b4a3553534 | |||
| 7ccd59e630 | |||
| a2d9549de2 | |||
| 06b0556f45 | |||
| d768d8667b | |||
| 99d4a44250 | |||
| 29d15cbe2c | |||
| c88a6b6f58 | |||
| bb14f09ffb | |||
| dd5da6184e | |||
| 3e77146c1e | |||
| dcf9d3cdc1 | |||
| c967edd162 | |||
| b1475b1f71 | |||
| b2d5f88f98 | |||
| 4b9fea49ce | |||
| 31283a292a | |||
| 8ae3cb6917 | |||
| 0e7ff3091f | |||
| 5e5762486e | |||
| dca3ef02cd | |||
| b312083ecd | |||
| bc7c45f3d6 | |||
| bf0db08c7c | |||
| 52ab1cc715 | |||
| 19fd4079cb | |||
| 7b3fc0f2ef | |||
| e441def8b3 | |||
| 51f83260df | |||
| 2fa68b1f23 | |||
| 79be721591 | |||
| 1c07d65561 | |||
| a061a43e8f | |||
| 250cea583f | |||
| fdf0ac1b11 | |||
| 10cf157493 | |||
| dedd55ab37 | |||
| c420dad905 | |||
| 48b6011e17 | |||
| 6dcde313d1 | |||
| 0aa18baecc | |||
| cc99d3fff4 | |||
| c536a1ee97 | |||
| 7a13fa8f9e | |||
| d69b757135 |
+468
-157
@@ -4,18 +4,38 @@
|
||||
Gitea 1.22.6+ has auto-merge (`pull_auto_merge`) but no GitHub-style merge
|
||||
queue. This script provides the missing serialized policy in user space:
|
||||
|
||||
1. Pick the oldest open PR carrying QUEUE_LABEL (skipping HOLD_LABEL).
|
||||
2. Refuse to act unless main's BP-required contexts are green.
|
||||
1. Scan open same-repo PRs that are NOT opted out (auto-discovery, see below),
|
||||
oldest-first, skipping drafts, until an ACTIONABLE one is found. A non-ready
|
||||
candidate (REQUEST_CHANGES, mergeable!=True, insufficient genuine approvals,
|
||||
or red required CI) is SKIPPED so it cannot head-of-line block newer ready
|
||||
PRs; the scan continues to the next candidate.
|
||||
2. Refuse to act unless main's BP-required contexts are green. This is also
|
||||
the serialized backstop for direct-merge (see below): after a direct merge,
|
||||
main re-runs push CI and this gate PAUSES the queue if main goes red, so no
|
||||
merge piles onto an unverified/red main (issue #2358).
|
||||
3. Refuse fork PRs; the queue may only mutate same-repo branches.
|
||||
4. If the PR branch does not contain current main, call Gitea's
|
||||
/pulls/{n}/update endpoint and stop. CI must rerun on the updated head.
|
||||
4. DIRECT-MERGE when conflict-free (issue #2358). When Gitea reports the PR
|
||||
conflict-free (mergeable is True) and the merge bar below is met, MERGE IT
|
||||
DIRECTLY — even if its head does not contain current main. We do NOT call
|
||||
/pulls/{n}/update first: branch protection does not require strict
|
||||
up-to-date, so behind-main conflict-free PRs merge cleanly, and calling
|
||||
/update would trigger Gitea dismiss_stale_approvals (dismissing the genuine
|
||||
approvals and forcing a re-review every tick — the rebase-churn bottleneck).
|
||||
The /update path is used ONLY when the PR is DEFINITIVELY not mergeable
|
||||
(mergeable is literal False) AND its head lacks current main — refreshing the
|
||||
branch may resolve a behind-main non-conflict; a real conflict returns HTTP
|
||||
409 and the PR is HELD per #2352. mergeable=None/missing (Gitea STILL
|
||||
COMPUTING conflict state) is a distinct fail-closed WAIT: never merged AND
|
||||
never /update'd — calling /update during the compute window would dismiss the
|
||||
PR's genuine approvals (dismiss_stale_approvals) and re-introduce the exact
|
||||
rebase-churn this queue eliminates. None is re-checked next tick.
|
||||
5. Merge ONLY when, on the PR's CURRENT head sha:
|
||||
- >= REQUIRED_APPROVALS distinct GENUINE official APPROVED reviews from
|
||||
the recognised reviewer set (not stale, not dismissed, commit_id ==
|
||||
current head), AND
|
||||
- no open official REQUEST_CHANGES on the current head, AND
|
||||
- every BP-required status context is green, AND
|
||||
- the PR is mergeable.
|
||||
- the PR is mergeable (Gitea reports it conflict-free).
|
||||
|
||||
Authoritative gates (fail-closed):
|
||||
- The REQUIRED status contexts come from BRANCH PROTECTION
|
||||
@@ -29,13 +49,42 @@ Authoritative gates (fail-closed):
|
||||
approvals present). It is NEVER used to bypass a failing REQUIRED context
|
||||
or missing approvals.
|
||||
|
||||
Head-of-line (HOL) safety: a permanent permission/4xx merge error
|
||||
(403/404/405) HOLDS the PR (applies HOLD_LABEL) so the queue advances to the
|
||||
next PR instead of re-selecting the same wedged PR every tick. Likewise, a
|
||||
persistent branch-update conflict (the /update endpoint returns HTTP 409
|
||||
because the PR branch cannot be merged with main without manual rebase) HOLDS
|
||||
the PR — a conflict will not self-resolve, so retrying it every tick would
|
||||
HOL-block every ready PR behind it (issue #2352).
|
||||
Auto-discovery (opt-OUT, label-optional):
|
||||
The queue is SELF-SUSTAINING — a ready PR does NOT need a human (or an agent)
|
||||
to add the `merge-queue` label first. When AUTO_DISCOVER is on (default), the
|
||||
queue enumerates ALL open same-repo PRs and considers any that meets the full
|
||||
merge bar (genuine approvals on current head + BP-required green + mergeable +
|
||||
no open REQUEST_CHANGES). The merge bar above is UNCHANGED; auto-discovery only
|
||||
changes WHICH PRs are considered, not whether they are mergeable.
|
||||
|
||||
This deliberately removes the historical dependency on an agent adding the
|
||||
`merge-queue` label — agent Gitea tokens lack `write:issue` (labels are
|
||||
issue-scoped), so they could never self-label and the queue stalled. The label
|
||||
is now OPTIONAL metadata, not a gate.
|
||||
|
||||
SAFETY is preserved as opt-OUT: any PR carrying an opt-out label
|
||||
(OPT_OUT_LABELS — `merge-queue-hold`, `do-not-auto-merge`, `wip`, `draft` by
|
||||
default) is skipped (never auto-considered, never merged). Draft PRs
|
||||
(draft=true STATE) are also skipped; the literal `draft` LABEL is an
|
||||
additional explicit opt-out a human can apply without converting to a draft.
|
||||
A human who wants to keep a PR out of autonomous merging just adds one of
|
||||
those labels. Setting AUTO_DISCOVER=0 restores the legacy opt-IN behaviour
|
||||
(only PRs already carrying QUEUE_LABEL are considered).
|
||||
|
||||
Head-of-line (HOL) safety has two complementary layers:
|
||||
(a) The queue SCANS THROUGH the FIFO candidate list and skips any non-ready
|
||||
PR (REQUEST_CHANGES, mergeable!=True, insufficient genuine approvals, or
|
||||
red required CI) instead of locking on the oldest and waiting, so a PR
|
||||
that can never become ready without human action does not block newer
|
||||
ready PRs.
|
||||
(b) For the candidate the scan acts on, two permanent failure modes HOLD the
|
||||
PR (apply HOLD_LABEL) and let the scan CONTINUE to the next candidate
|
||||
rather than re-selecting the same wedged PR every tick:
|
||||
- a permanent permission/4xx merge error (403/404/405), and
|
||||
- a persistent branch-update conflict (the /update endpoint returns
|
||||
HTTP 409 because the PR branch cannot be merged with main without a
|
||||
manual rebase). A conflict will not self-resolve, so retrying it
|
||||
every tick would HOL-block every ready PR behind it (issue #2352).
|
||||
|
||||
Status-fetch is fail-closed: if the combined status for a sha cannot be
|
||||
fetched, the PR is skipped this tick (never treated as green).
|
||||
@@ -68,6 +117,33 @@ WATCH_BRANCH = _env("WATCH_BRANCH", default="main")
|
||||
QUEUE_LABEL = _env("QUEUE_LABEL", default="merge-queue")
|
||||
HOLD_LABEL = _env("HOLD_LABEL", default="merge-queue-hold")
|
||||
UPDATE_STYLE = _env("UPDATE_STYLE", default="merge")
|
||||
# Auto-discovery (opt-OUT). When truthy (default), the queue considers ALL open
|
||||
# same-repo PRs that meet the merge bar, not only PRs already carrying
|
||||
# QUEUE_LABEL — so the queue is self-sustaining without any human/agent labeling
|
||||
# (agent tokens lack write:issue and cannot self-label). Set AUTO_DISCOVER=0 to
|
||||
# restore the legacy opt-IN behaviour (QUEUE_LABEL required to be considered).
|
||||
AUTO_DISCOVER = _env("AUTO_DISCOVER", default="1").strip().lower() not in {
|
||||
"0",
|
||||
"false",
|
||||
"no",
|
||||
"off",
|
||||
"",
|
||||
}
|
||||
# Opt-OUT labels. A PR carrying ANY of these is skipped (never auto-considered,
|
||||
# never merged) — the human escape hatch from autonomous merging. HOLD_LABEL is
|
||||
# always included so the existing hold semantics keep working. `do-not-auto-merge`
|
||||
# and `wip` let a human keep a PR out of the auto-merge path without removing it.
|
||||
# `draft` is included as a literal label too: Gitea draft STATE (draft=true) is
|
||||
# already skipped via _issue_is_draft, but a "draft" LABEL is an additional,
|
||||
# explicit opt-out signal a human can apply without converting the PR to a draft.
|
||||
OPT_OUT_LABELS = {
|
||||
name.strip()
|
||||
for name in _env(
|
||||
"OPT_OUT_LABELS",
|
||||
default="do-not-auto-merge,wip,draft",
|
||||
).split(",")
|
||||
if name.strip()
|
||||
} | ({HOLD_LABEL} if HOLD_LABEL else set())
|
||||
REQUIRED_CONTEXTS_RAW = _env(
|
||||
"REQUIRED_CONTEXTS",
|
||||
default=(
|
||||
@@ -208,6 +284,34 @@ def api(
|
||||
return status, {"_raw": raw.decode("utf-8", errors="replace")}
|
||||
|
||||
|
||||
def api_paginated(
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
query: dict[str, str] | None = None,
|
||||
page_size: int = 50,
|
||||
) -> list[dict]:
|
||||
"""Fetch all pages of a paginated Gitea list endpoint.
|
||||
|
||||
Gitea paginates with `page` (1-indexed) and `limit`. We loop until a
|
||||
page returns fewer than `page_size` items, indicating the end.
|
||||
"""
|
||||
results: list[dict] = []
|
||||
page = 1
|
||||
while True:
|
||||
page_query = dict(query or {})
|
||||
page_query["page"] = str(page)
|
||||
page_query["limit"] = str(page_size)
|
||||
_, body = api(method, path, query=page_query)
|
||||
if not isinstance(body, list):
|
||||
raise ApiError(f"{path} paginated response not list")
|
||||
results.extend(body)
|
||||
if len(body) < page_size:
|
||||
break
|
||||
page += 1
|
||||
return results
|
||||
|
||||
|
||||
def required_contexts(raw: str) -> list[str]:
|
||||
return [part.strip() for part in raw.split(",") if part.strip()]
|
||||
|
||||
@@ -240,17 +344,18 @@ def _is_tier_low_pending_ok(
|
||||
) -> bool:
|
||||
"""Return True if tier:low PR can tolerate sop-checklist pending state.
|
||||
|
||||
Per sop-checklist-config.yaml tier_failure_mode, tier:low uses soft-fail:
|
||||
sop-checklist posts state=pending when acks are satisfied (missing
|
||||
manager/ceo acks are informational only). The queue should accept
|
||||
pending instead of waiting for success.
|
||||
GENERIC PENDING-AS-GREEN REMOVED (Researcher + CR2 RC on #2368):
|
||||
The prior soft-fail accepted ANY pending sop-checklist for tier:low,
|
||||
which allowed required checks to pass without genuine verification.
|
||||
Pending required sop-checklist must now always HOLD and appear in
|
||||
missing_or_bad. This function is retained as a policy hook but
|
||||
currently always returns False so pending never counts green.
|
||||
|
||||
If a positively identifiable genuine soft-fail state is defined in
|
||||
future (e.g., a specific check-run conclusion), implement it here
|
||||
with strict positive identification — never default to pass.
|
||||
"""
|
||||
if "tier:low" not in pr_labels:
|
||||
return False
|
||||
if "sop-checklist" not in context:
|
||||
return False
|
||||
status = latest_statuses.get(context) or {}
|
||||
return status_state(status) == "pending"
|
||||
return False
|
||||
|
||||
|
||||
def required_contexts_green(
|
||||
@@ -410,6 +515,85 @@ def choose_next_queued_issue(
|
||||
return candidates[0] if candidates else None
|
||||
|
||||
|
||||
def _issue_is_draft(issue: dict) -> bool:
|
||||
"""True if the issue/PR is a draft.
|
||||
|
||||
The /issues listing exposes draft state under the `pull_request` sub-object
|
||||
(`{"draft": true}`); some Gitea versions also surface a top-level `draft`.
|
||||
Either is honoured. Drafts are never auto-considered for merging.
|
||||
"""
|
||||
pr = issue.get("pull_request")
|
||||
if isinstance(pr, dict) and pr.get("draft") is True:
|
||||
return True
|
||||
return issue.get("draft") is True
|
||||
|
||||
|
||||
def choose_candidate_issues(
|
||||
issues: list[dict],
|
||||
*,
|
||||
queue_label: str,
|
||||
opt_out_labels: set[str],
|
||||
auto_discover: bool,
|
||||
) -> list[dict]:
|
||||
"""All open PRs eligible for a merge attempt this tick, oldest-first.
|
||||
|
||||
This is the auto-discovery selector. It does NOT change the merge bar — it
|
||||
only changes WHICH PRs are considered:
|
||||
|
||||
- auto_discover=True (default): every open same-repo PR is a candidate,
|
||||
EXCEPT those carrying an opt-out label or marked draft. The QUEUE_LABEL
|
||||
is optional metadata, not a gate, so a ready PR reaches the queue with no
|
||||
human/agent labeling (the write:issue gap is removed).
|
||||
- auto_discover=False: legacy opt-IN — only PRs carrying queue_label are
|
||||
candidates (still skipping opt-out labels and drafts).
|
||||
|
||||
Opt-out is the safety escape hatch: any opt_out_labels member present skips
|
||||
the PR entirely (never considered, never merged). Ordering is oldest-first
|
||||
(created_at, then number) to preserve the serialized FIFO ordering.
|
||||
|
||||
Returns the FULL ordered list (not just the head) so process_once can SCAN
|
||||
THROUGH non-ready candidates instead of locking on the oldest. A non-ready
|
||||
auto-discovered PR (e.g. one with REQUEST_CHANGES or mergeable=false, which
|
||||
can never become ready without human action) must NOT head-of-line block the
|
||||
newer ready PRs behind it — the readiness check happens per-candidate in
|
||||
process_once, and a `wait` candidate is skipped to the next one.
|
||||
"""
|
||||
candidates = []
|
||||
for issue in issues:
|
||||
if "pull_request" not in issue:
|
||||
continue
|
||||
labels = label_names(issue)
|
||||
if opt_out_labels & labels:
|
||||
continue # opt-out: human kept this PR out of autonomous merging
|
||||
if _issue_is_draft(issue):
|
||||
continue # drafts are never auto-merged
|
||||
if not auto_discover and queue_label not in labels:
|
||||
continue # legacy opt-IN: require the queue label
|
||||
candidates.append(issue)
|
||||
candidates.sort(key=lambda issue: (issue.get("created_at") or "", int(issue["number"])))
|
||||
return candidates
|
||||
|
||||
|
||||
def choose_next_candidate_issue(
|
||||
issues: list[dict],
|
||||
*,
|
||||
queue_label: str,
|
||||
opt_out_labels: set[str],
|
||||
auto_discover: bool,
|
||||
) -> dict | None:
|
||||
"""The oldest eligible candidate, or None. Thin head-of-list wrapper around
|
||||
choose_candidate_issues; retained for callers/tests that only want the head.
|
||||
process_once uses the full list (choose_candidate_issues) so it can scan past
|
||||
non-ready PRs rather than HOL-block on the oldest."""
|
||||
candidates = choose_candidate_issues(
|
||||
issues,
|
||||
queue_label=queue_label,
|
||||
opt_out_labels=opt_out_labels,
|
||||
auto_discover=auto_discover,
|
||||
)
|
||||
return candidates[0] if candidates else None
|
||||
|
||||
|
||||
def pr_contains_base_sha(commits: list[dict], base_sha: str) -> bool:
|
||||
for commit in commits:
|
||||
sha = commit.get("sha") or commit.get("id")
|
||||
@@ -454,29 +638,32 @@ def evaluate_merge_readiness(
|
||||
approvers: set[str],
|
||||
request_changes: list[str],
|
||||
pr_has_current_base: bool,
|
||||
mergeable: bool,
|
||||
mergeable: bool | None,
|
||||
pr_labels: set[str] | None = None,
|
||||
) -> MergeDecision:
|
||||
# 1) Main's push-required contexts must be green. Combined state can be
|
||||
# "failure" due to non-blocking jobs (continue-on-error: true) that do
|
||||
# not gate merges, so check the explicit required set, not combined.
|
||||
#
|
||||
# This main-green gate is ALSO the serialized backstop that makes the
|
||||
# direct-merge (no update) path safe (issue #2358): after a direct merge
|
||||
# of a behind-main PR, main re-runs its push CI; if a semantic main-break
|
||||
# slips through (PR green standalone but broken when combined with newer
|
||||
# main), main's required contexts go red and this gate PAUSES the queue —
|
||||
# no further merge piles onto an unverified/red main until it is green.
|
||||
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
|
||||
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
|
||||
if not main_ok:
|
||||
return MergeDecision(False, "pause", "main required contexts not green: " + ", ".join(main_bad))
|
||||
|
||||
# 2) PR head must contain current main.
|
||||
if not pr_has_current_base:
|
||||
return MergeDecision(False, "update", "PR head does not contain current main")
|
||||
|
||||
# 3) No open official REQUEST_CHANGES on the current head.
|
||||
# 2) No open official REQUEST_CHANGES on the current head.
|
||||
if request_changes:
|
||||
return MergeDecision(
|
||||
False, "wait",
|
||||
"open REQUEST_CHANGES on current head from: " + ", ".join(sorted(request_changes)),
|
||||
)
|
||||
|
||||
# 4) Enough distinct genuine official approvals on the current head.
|
||||
# 3) Enough distinct genuine official approvals on the current head.
|
||||
if len(approvers) < required_approvals:
|
||||
return MergeDecision(
|
||||
False, "wait",
|
||||
@@ -485,7 +672,7 @@ def evaluate_merge_readiness(
|
||||
f"need {required_approvals}",
|
||||
)
|
||||
|
||||
# 5) Every BRANCH-PROTECTION-REQUIRED status context must be green. This is
|
||||
# 4) Every BRANCH-PROTECTION-REQUIRED status context must be green. This is
|
||||
# the authoritative status gate — NON-required reds (qa-review,
|
||||
# security-review, sop-tier/sop-checklist when not BP-required, E2E Chat,
|
||||
# Staging SaaS, ci-arm64-advisory, continue-on-error jobs) are NOT
|
||||
@@ -495,16 +682,53 @@ def evaluate_merge_readiness(
|
||||
if not ok:
|
||||
return MergeDecision(False, "wait", "required contexts not green: " + ", ".join(missing_or_bad))
|
||||
|
||||
# 6) Gitea must consider the PR mergeable (no conflicts).
|
||||
if not mergeable:
|
||||
return MergeDecision(False, "wait", "PR is not mergeable (conflicts)")
|
||||
# 5) DIRECT-MERGE when conflict-free (issue #2358 — throughput fix).
|
||||
# If Gitea reports the PR conflict-free (mergeable is True), MERGE IT
|
||||
# DIRECTLY even if its head does not yet contain current main. Branch
|
||||
# protection does NOT require strict up-to-date, so a behind-main but
|
||||
# conflict-free PR merges cleanly. We deliberately do NOT call
|
||||
# /pulls/{n}/update first: update triggers Gitea dismiss_stale_approvals,
|
||||
# which would dismiss the PR's genuine approvals and force a full
|
||||
# re-review every tick — the rebase-churn bottleneck that collapsed
|
||||
# throughput to ~0/hr with dozens of mergeable PRs open.
|
||||
#
|
||||
# The merge bar is UNCHANGED: we only reach here with main green +
|
||||
# >= required genuine approvals on the current head + no open
|
||||
# REQUEST_CHANGES + every BP-required context green. The trade-off is
|
||||
# that the PR's CI ran on a possibly-behind base, so a SEMANTIC main-break
|
||||
# is caught by POST-merge main CI (step 1's pause backstop) rather than
|
||||
# pre-merge. force_merge is used ONLY for missing-but-non-required
|
||||
# governance reds (required are green + approvals genuine), never to
|
||||
# bypass a failing required context or an approval shortfall.
|
||||
if mergeable is True:
|
||||
force = _non_required_red_present(latest, required_contexts)
|
||||
return MergeDecision(True, "merge", "ready", force=force)
|
||||
|
||||
# Ready. Use force_merge ONLY if the merge would otherwise be blocked by
|
||||
# missing-but-non-required governance contexts. Required are green and
|
||||
# approvals are genuine, so force only bypasses non-required reds — never a
|
||||
# failing required context or missing approval.
|
||||
force = _non_required_red_present(latest, required_contexts)
|
||||
return MergeDecision(True, "merge", "ready", force=force)
|
||||
# 6) NOT (yet) mergeable. TRI-STATE, fail-closed — never merge on an unknown.
|
||||
# We MUST distinguish "still computing" (None/missing) from a "definitive
|
||||
# conflict" (False); collapsing them would route a behind-main but
|
||||
# STILL-COMPUTING PR into the /update path, whose dismiss_stale_approvals
|
||||
# is the rebase-churn this change eliminates.
|
||||
#
|
||||
# mergeable is None → Gitea has NOT finished computing conflict state.
|
||||
# WAIT: do nothing this tick — never /update (would dismiss genuine
|
||||
# approvals during the compute window → churn), never merge. Re-check next
|
||||
# tick once Gitea reports a decisive True/False.
|
||||
if mergeable is None:
|
||||
return MergeDecision(
|
||||
False, "wait",
|
||||
"PR mergeability is still being computed (mergeable=None) — waiting",
|
||||
)
|
||||
|
||||
# mergeable is False → DEFINITIVE not-mergeable. If the head also does not
|
||||
# contain current main, try the /update path to refresh the branch (this
|
||||
# may resolve a behind-main non-conflict; a real conflict returns HTTP 409
|
||||
# and process_once HOLDs the PR per #2352). If the head already contains
|
||||
# current main yet Gitea still reports not-mergeable, there is nothing the
|
||||
# queue can do (genuine conflict against current main) — WAIT.
|
||||
if not pr_has_current_base:
|
||||
return MergeDecision(False, "update", "PR not mergeable and head does not contain current main")
|
||||
return MergeDecision(False, "wait", "PR is not mergeable (conflicts)")
|
||||
|
||||
|
||||
def get_branch_head(branch: str) -> str:
|
||||
@@ -520,32 +744,23 @@ def get_combined_status(sha: str) -> dict:
|
||||
"""Combined status + all individual statuses for `sha`.
|
||||
|
||||
The /status endpoint caps the `statuses` array at 30 entries (Gitea
|
||||
default page size), so we fetch the full list via /statuses with a
|
||||
higher limit. The combined `state` still comes from /status.
|
||||
default page size), so we fetch the full list via /statuses. The combined
|
||||
`state` still comes from /status.
|
||||
|
||||
Fail-closed: the PRIMARY /status fetch must succeed. If it raises, the
|
||||
error propagates so the caller skips this PR this tick (we never treat a
|
||||
failed status fetch as green — dev-sop "no fail-open"). Only the SECONDARY
|
||||
/statuses enrichment (which merely extends the per-context list beyond the
|
||||
30-entry cap) is best-effort; if it fails we still have the combined set.
|
||||
Fail-closed: BOTH the PRIMARY /status fetch AND the SECONDARY /statuses
|
||||
enrichment must succeed. If either raises, the error propagates so the
|
||||
caller skips this PR this tick (we never treat a failed status fetch as
|
||||
green — dev-sop "no fail-open"). A paginated /statuses error must NOT
|
||||
silently degrade to an incomplete status set.
|
||||
"""
|
||||
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
|
||||
if not isinstance(combined, dict):
|
||||
raise ApiError(f"status for {sha} response not object")
|
||||
combined_statuses: list[dict] = combined.get("statuses") or []
|
||||
try:
|
||||
_, all_statuses_raw = api(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
|
||||
query={"limit": "50"},
|
||||
)
|
||||
if isinstance(all_statuses_raw, list):
|
||||
all_statuses: list[dict] = list(all_statuses_raw)
|
||||
else:
|
||||
all_statuses = []
|
||||
except (ApiError, urllib.error.URLError, TimeoutError, OSError) as exc:
|
||||
sys.stderr.write(f"::warning::could not fetch full statuses list for {sha[:8]}: {exc}\n")
|
||||
all_statuses = []
|
||||
all_statuses = api_paginated(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
|
||||
)
|
||||
# Build latest per context: process combined (ascending→reverse=newest
|
||||
# first), then fill gaps from all_statuses (already newest-first).
|
||||
latest: dict[str, dict] = {}
|
||||
@@ -562,19 +777,36 @@ def get_combined_status(sha: str) -> dict:
|
||||
|
||||
|
||||
def list_queued_issues() -> list[dict]:
|
||||
_, body = api(
|
||||
return api_paginated(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/issues",
|
||||
query={
|
||||
"state": "open",
|
||||
"type": "pulls",
|
||||
"labels": QUEUE_LABEL,
|
||||
"limit": "50",
|
||||
},
|
||||
)
|
||||
if not isinstance(body, list):
|
||||
raise ApiError("queued issues response not list")
|
||||
return body
|
||||
|
||||
|
||||
def list_candidate_issues(*, auto_discover: bool) -> list[dict]:
|
||||
"""Open PR issues eligible for consideration this tick.
|
||||
|
||||
With auto_discover=True (default) this enumerates ALL open PRs (no label
|
||||
filter) so the queue is self-sustaining — a ready PR is considered without
|
||||
any human/agent first adding QUEUE_LABEL. With auto_discover=False it falls
|
||||
back to the legacy label-filtered listing (opt-IN). Opt-out filtering and
|
||||
draft-skipping happen in choose_next_candidate_issue, not here.
|
||||
"""
|
||||
if not auto_discover:
|
||||
return list_queued_issues()
|
||||
return api_paginated(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/issues",
|
||||
query={
|
||||
"state": "open",
|
||||
"type": "pulls",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def get_pull(pr_number: int) -> dict:
|
||||
@@ -731,45 +963,189 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
print(f"::notice::queue paused: {WATCH_BRANCH}@{main_sha[:8]} required contexts not green: {', '.join(main_bad)}")
|
||||
return 0
|
||||
|
||||
issue = choose_next_queued_issue(
|
||||
list_queued_issues(),
|
||||
candidates = choose_candidate_issues(
|
||||
list_candidate_issues(auto_discover=AUTO_DISCOVER),
|
||||
queue_label=QUEUE_LABEL,
|
||||
hold_label=HOLD_LABEL,
|
||||
opt_out_labels=OPT_OUT_LABELS,
|
||||
auto_discover=AUTO_DISCOVER,
|
||||
)
|
||||
if not issue:
|
||||
print("::notice::merge queue empty")
|
||||
if not candidates:
|
||||
print(
|
||||
"::notice::no merge candidates "
|
||||
f"(auto_discover={'on' if AUTO_DISCOVER else 'off'})"
|
||||
)
|
||||
return 0
|
||||
|
||||
# HOL fix: SCAN THROUGH the FIFO candidate list until a PR we can ACT on is
|
||||
# found, instead of locking on the oldest and waiting. A non-ready candidate
|
||||
# (decision.action == "wait": REQUEST_CHANGES, mergeable!=True, insufficient
|
||||
# genuine approvals, or red required CI) is SKIPPED — it must NOT head-of-line
|
||||
# block the newer ready PRs behind it. The merge bar is unchanged: a skipped
|
||||
# PR is never merged, and the first ACTIONABLE candidate (an "update" that
|
||||
# advances a stale branch, or a fully-ready "merge") terminates the scan.
|
||||
#
|
||||
# `update` is treated as actionable, not skippable: a PR whose head merely
|
||||
# lacks current main is in a legitimate in-progress state (updating it +
|
||||
# rerunning CI moves it toward ready), unlike a PR that can never become
|
||||
# ready without a human (RC / conflict), which is a `wait` and gets skipped.
|
||||
for issue in candidates:
|
||||
decision, ctx = _evaluate_candidate(
|
||||
issue,
|
||||
main_sha=main_sha,
|
||||
main_status=main_status,
|
||||
required_contexts=contexts,
|
||||
required_approvals=required_approvals,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
if decision is None:
|
||||
continue # not merge-eligible (not-open / opted-out / fork / wrong base)
|
||||
pr_number = ctx["pr_number"]
|
||||
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
|
||||
if decision.action == "wait":
|
||||
# Non-ready: skip to the next candidate (no HOL block, no merge).
|
||||
continue
|
||||
if decision.action == "update":
|
||||
try:
|
||||
update_pull(pr_number, dry_run=dry_run)
|
||||
except BranchUpdateConflictError as exc:
|
||||
# The branch cannot be updated with main because of a real
|
||||
# conflict (HTTP 409 from /update). This is the #2352 HOL guard:
|
||||
# a conflict will not self-resolve without a human/agent rebase,
|
||||
# so re-attempting the update every tick would head-of-line block
|
||||
# every ready PR behind it. HOLD this PR (apply HOLD_LABEL, which
|
||||
# is an opt-out label so later ticks skip it) and CONTINUE the
|
||||
# scan so a newer ready PR can still merge this tick. Fail-closed:
|
||||
# a held PR is skipped, never merged.
|
||||
sys.stderr.write(
|
||||
f"::error::branch-update conflict for PR #{pr_number}: {exc}\n"
|
||||
)
|
||||
hold_note = (
|
||||
"merge-queue: could not update this branch with "
|
||||
f"`{WATCH_BRANCH}` — the update returned a merge conflict "
|
||||
f"(HTTP 409) that the queue cannot auto-resolve ({exc}). "
|
||||
f"Applied `{HOLD_LABEL}` to unblock the queue (HOL guard). "
|
||||
f"Fix: rebase/merge `{WATCH_BRANCH}` into this branch and "
|
||||
f"resolve the conflicts, then remove `{HOLD_LABEL}` to requeue."
|
||||
)
|
||||
hold_pr(pr_number, hold_note, dry_run=dry_run)
|
||||
continue # held — keep scanning for a mergeable candidate
|
||||
post_comment(
|
||||
pr_number,
|
||||
(
|
||||
f"merge-queue: updated this branch with `{WATCH_BRANCH}` at "
|
||||
f"`{main_sha[:12]}`. Waiting for CI on the refreshed head."
|
||||
),
|
||||
dry_run=dry_run,
|
||||
)
|
||||
return 0
|
||||
if decision.ready:
|
||||
latest_main_sha = get_branch_head(WATCH_BRANCH)
|
||||
if latest_main_sha != main_sha:
|
||||
print(
|
||||
f"::notice::main moved {main_sha[:8]} -> {latest_main_sha[:8]}; "
|
||||
"deferring to next tick"
|
||||
)
|
||||
return 0
|
||||
try:
|
||||
merge_pull(pr_number, dry_run=dry_run, force=decision.force)
|
||||
except MergePermissionError as exc:
|
||||
# Permanent merge failure (HTTP 403/404/405). HOLD this PR by
|
||||
# applying HOLD_LABEL (it becomes an opt-out label, so subsequent
|
||||
# ticks skip it) and CONTINUE scanning so the queue still advances
|
||||
# to the next ready PR this tick rather than stalling.
|
||||
sys.stderr.write(f"::error::merge permission error for PR #{pr_number}: {exc}\n")
|
||||
hold_note = (
|
||||
"merge-queue: merge failed with a permanent permission error "
|
||||
f"({exc}). No available token has Can-merge permission for this "
|
||||
f"PR. Applied `{HOLD_LABEL}` to unblock the queue (HOL guard). "
|
||||
f"Fix: grant Can-merge to the queue token, then remove "
|
||||
f"`{HOLD_LABEL}` to requeue."
|
||||
)
|
||||
try:
|
||||
add_label_by_name(pr_number, HOLD_LABEL, dry_run=dry_run)
|
||||
except ApiError as label_exc:
|
||||
# If we cannot even apply the hold label, fall back to a comment
|
||||
# so the wedge is at least visible; do NOT loop on this PR.
|
||||
sys.stderr.write(
|
||||
f"::error::could not apply HOLD_LABEL to PR #{pr_number}: {label_exc}\n"
|
||||
)
|
||||
hold_note += (
|
||||
f"\n\n(NOTE: could not apply the hold label automatically: "
|
||||
f"{label_exc}. Please add `{HOLD_LABEL}` manually.)"
|
||||
)
|
||||
post_comment(pr_number, hold_note, dry_run=dry_run)
|
||||
continue # held — keep scanning for a mergeable candidate
|
||||
return 0
|
||||
return 0
|
||||
|
||||
|
||||
def _evaluate_candidate(
|
||||
issue: dict,
|
||||
*,
|
||||
main_sha: str,
|
||||
main_status: dict,
|
||||
required_contexts: list[str],
|
||||
required_approvals: int,
|
||||
dry_run: bool,
|
||||
) -> tuple[MergeDecision | None, dict]:
|
||||
"""Evaluate a single auto-discovered candidate against the full merge bar.
|
||||
|
||||
Returns (decision, ctx) where ctx carries {"pr_number"}. A None decision
|
||||
means the PR is not merge-eligible at all (not open / opted-out / draft /
|
||||
fork / wrong base) and the caller should skip to the next candidate; for
|
||||
fork / wrong-base the explanatory comment is posted here before returning.
|
||||
|
||||
The merge bar is UNCHANGED from the single-PR path — this only factors the
|
||||
per-PR evaluation out so process_once can scan multiple candidates. A failed
|
||||
status fetch still raises (fail-closed): it propagates to the caller so the
|
||||
PR is never treated as green.
|
||||
"""
|
||||
pr_number = int(issue["number"])
|
||||
ctx = {"pr_number": pr_number}
|
||||
pr = get_pull(pr_number)
|
||||
if pr.get("state") != "open":
|
||||
print(f"::notice::PR #{pr_number} is not open; skipping")
|
||||
return 0
|
||||
return None, ctx
|
||||
# Defensive opt-out/draft re-check on the authoritative pull payload: the
|
||||
# /issues listing's label/draft view can lag, but the merge bar must respect
|
||||
# the live pull state. (choose_candidate_issues already filtered on the
|
||||
# listing; this guards against a stale listing racing a just-added opt-out.)
|
||||
if OPT_OUT_LABELS & label_names(pr):
|
||||
print(f"::notice::PR #{pr_number} carries an opt-out label; skipping")
|
||||
return None, ctx
|
||||
if pr.get("draft") is True:
|
||||
print(f"::notice::PR #{pr_number} is a draft; skipping")
|
||||
return None, ctx
|
||||
if pr.get("base", {}).get("ref") != WATCH_BRANCH:
|
||||
post_comment(pr_number, f"merge-queue: skipped; base branch is not `{WATCH_BRANCH}`.", dry_run=dry_run)
|
||||
return 0
|
||||
return None, ctx
|
||||
if pr.get("head", {}).get("repo_id") != pr.get("base", {}).get("repo_id"):
|
||||
post_comment(pr_number, "merge-queue: skipped; fork PRs are not supported by the serialized queue.", dry_run=dry_run)
|
||||
return 0
|
||||
return None, ctx
|
||||
|
||||
head_sha = pr.get("head", {}).get("sha")
|
||||
if not isinstance(head_sha, str) or len(head_sha) < 7:
|
||||
raise ApiError(f"PR #{pr_number} missing head sha")
|
||||
commits = get_pull_commits(pr_number)
|
||||
current_base = pr_has_current_base(pr, commits, main_sha)
|
||||
# Fail-closed: a failed status fetch raises here and the tick is skipped
|
||||
# (the PR is never treated as green).
|
||||
# Fail-closed: a failed status fetch raises here and propagates (the PR is
|
||||
# never treated as green).
|
||||
pr_status = get_combined_status(head_sha)
|
||||
pr_labels = label_names(pr)
|
||||
# FAIL-CLOSED: Gitea returns mergeable=None (or omits the field) while it is
|
||||
# still COMPUTING conflict state. Only the literal True is decisive proof the
|
||||
# PR is conflict-free; None and False both mean "not (yet) mergeable". We must
|
||||
# NOT autonomously merge on an unknown — treat anything but True as not-yet-
|
||||
# mergeable so evaluate_merge_readiness returns a transient "wait" decision.
|
||||
# This is transient: process_once returns 0 (no hold label, no dequeue) and
|
||||
# the PR is re-checked next tick once Gitea has finished computing mergeability.
|
||||
mergeable_field = pr.get("mergeable")
|
||||
mergeable = mergeable_field is True
|
||||
# FAIL-CLOSED, TRI-STATE: Gitea returns mergeable=None (or omits the field)
|
||||
# while it is still COMPUTING conflict state, mergeable=False for a definitive
|
||||
# conflict, and mergeable=True only when it has proven the PR conflict-free.
|
||||
# We preserve all THREE states (do NOT collapse None/missing into False):
|
||||
# - True → direct-merge eligible (step 5).
|
||||
# - None / missing → still computing → WAIT (never merge, never update,
|
||||
# never dismiss approvals); re-check next tick.
|
||||
# - False → definitive conflict → the update/hold path (step 6).
|
||||
# Collapsing None→False would route a behind-main but STILL-COMPUTING PR into
|
||||
# the /update path, which triggers dismiss_stale_approvals — the exact
|
||||
# rebase-churn this change eliminates. Normalize only to the literal True /
|
||||
# False / None set (some Gitea versions omit the key entirely → None).
|
||||
raw_mergeable = pr.get("mergeable")
|
||||
mergeable: bool | None = raw_mergeable if isinstance(raw_mergeable, bool) else None
|
||||
|
||||
reviews = get_pull_reviews(pr_number)
|
||||
approvers, request_changes = genuine_approvals(
|
||||
@@ -779,7 +1155,7 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
decision = evaluate_merge_readiness(
|
||||
main_status=main_status,
|
||||
pr_status=pr_status,
|
||||
required_contexts=contexts,
|
||||
required_contexts=required_contexts,
|
||||
required_approvals=required_approvals,
|
||||
approvers=approvers,
|
||||
request_changes=request_changes,
|
||||
@@ -787,72 +1163,7 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
mergeable=mergeable,
|
||||
pr_labels=pr_labels,
|
||||
)
|
||||
|
||||
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
|
||||
if decision.action == "update":
|
||||
try:
|
||||
update_pull(pr_number, dry_run=dry_run)
|
||||
except BranchUpdateConflictError as exc:
|
||||
# The branch cannot be updated with main because of a real conflict
|
||||
# (HTTP 409). This is the HOL fix for issue #2352: previously the
|
||||
# 409 propagated to main() and the tick exited 0 with the PR still
|
||||
# queued, so the NEXT tick re-selected the SAME conflicted PR and
|
||||
# retried the failing update forever — head-of-line-blocking every
|
||||
# ready PR behind it. A conflict will not self-resolve; it needs a
|
||||
# human/agent rebase. So HOLD this PR (HOL guard) and advance to the
|
||||
# next candidate. Fail-closed: a held PR is skipped, never merged.
|
||||
sys.stderr.write(
|
||||
f"::error::branch-update conflict for PR #{pr_number}: {exc}\n"
|
||||
)
|
||||
hold_note = (
|
||||
"merge-queue: could not update this branch with "
|
||||
f"`{WATCH_BRANCH}` — the update returned a merge conflict "
|
||||
f"(HTTP 409) that the queue cannot auto-resolve ({exc}). "
|
||||
f"Applied `{HOLD_LABEL}` to unblock the queue (HOL guard). "
|
||||
f"Fix: rebase/merge `{WATCH_BRANCH}` into this branch and "
|
||||
f"resolve the conflicts, then remove `{HOLD_LABEL}` to requeue."
|
||||
)
|
||||
hold_pr(pr_number, hold_note, dry_run=dry_run)
|
||||
return 0
|
||||
post_comment(
|
||||
pr_number,
|
||||
(
|
||||
f"merge-queue: updated this branch with `{WATCH_BRANCH}` at "
|
||||
f"`{main_sha[:12]}`. Waiting for CI on the refreshed head."
|
||||
),
|
||||
dry_run=dry_run,
|
||||
)
|
||||
return 0
|
||||
if decision.ready:
|
||||
latest_main_sha = get_branch_head(WATCH_BRANCH)
|
||||
if latest_main_sha != main_sha:
|
||||
print(
|
||||
f"::notice::main moved {main_sha[:8]} -> {latest_main_sha[:8]}; "
|
||||
"deferring to next tick"
|
||||
)
|
||||
return 0
|
||||
try:
|
||||
merge_pull(pr_number, dry_run=dry_run, force=decision.force)
|
||||
except MergePermissionError as exc:
|
||||
# Permanent merge failure (HTTP 403/404/405). This is the
|
||||
# head-of-line (HOL) bug fix: previously we returned 0 with the PR
|
||||
# still queued, so the next tick re-selected the SAME wedged PR
|
||||
# forever and the queue never advanced. Instead, HOLD this PR by
|
||||
# applying HOLD_LABEL (choose_next_queued_issue skips held PRs), so
|
||||
# the queue moves on to the next candidate. A maintainer removes
|
||||
# the hold once the permission issue is fixed.
|
||||
sys.stderr.write(f"::error::merge permission error for PR #{pr_number}: {exc}\n")
|
||||
hold_note = (
|
||||
"merge-queue: merge failed with a permanent permission error "
|
||||
f"({exc}). No available token has Can-merge permission for this "
|
||||
f"PR. Applied `{HOLD_LABEL}` to unblock the queue (HOL guard). "
|
||||
f"Fix: grant Can-merge to the queue token, then remove "
|
||||
f"`{HOLD_LABEL}` to requeue."
|
||||
)
|
||||
hold_pr(pr_number, hold_note, dry_run=dry_run)
|
||||
return 0
|
||||
return 0
|
||||
return 0
|
||||
return decision, ctx
|
||||
|
||||
|
||||
def main() -> int:
|
||||
@@ -863,18 +1174,18 @@ def main() -> int:
|
||||
try:
|
||||
return process_once(dry_run=args.dry_run)
|
||||
except ApiError as exc:
|
||||
# API errors (401/403/404/500) are transient for a queue tick —
|
||||
# log and exit 0 so the workflow is not marked failed and the next
|
||||
# tick can retry. Returning non-zero would permanently fail the
|
||||
# workflow run, blocking future ticks.
|
||||
# FAIL-CLOSED: API errors are not "transient success" — they mean
|
||||
# the queue could not evaluate merge state. Returning 0 hides
|
||||
# persistent infra issues (auth drift, endpoint outages) from
|
||||
# operators. Return 1 so the cron job surfaces red and paging fires.
|
||||
sys.stderr.write(f"::error::queue API error: {exc}\n")
|
||||
return 0
|
||||
return 1
|
||||
except urllib.error.URLError as exc:
|
||||
sys.stderr.write(f"::error::queue network error: {exc}\n")
|
||||
return 0
|
||||
return 1
|
||||
except TimeoutError as exc:
|
||||
sys.stderr.write(f"::error::queue timeout: {exc}\n")
|
||||
return 0
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -546,16 +546,24 @@ def verify_flip(flip: dict, branch: str, n: int) -> dict:
|
||||
|
||||
shas = recent_commits_on_branch(branch, n)
|
||||
if not shas:
|
||||
result["warnings"].append(
|
||||
f"no recent commits on {branch} (cannot verify flip)"
|
||||
)
|
||||
result["masked_runs"].append({
|
||||
"sha": "",
|
||||
"status": "unverified",
|
||||
"target_url": "",
|
||||
"samples": [f"no recent commits on {branch} — cannot verify flip"],
|
||||
})
|
||||
return result
|
||||
|
||||
for sha in shas:
|
||||
try:
|
||||
status_doc = combined_status(sha)
|
||||
except ApiError as e:
|
||||
result["warnings"].append(f"combined-status for {sha}: {e}")
|
||||
result["masked_runs"].append({
|
||||
"sha": sha,
|
||||
"status": "error",
|
||||
"target_url": "",
|
||||
"samples": [f"combined-status API error: {e}"],
|
||||
})
|
||||
continue
|
||||
statuses = status_doc.get("statuses") or []
|
||||
# First entry matching the context name. Newest SHAs come
|
||||
@@ -582,6 +590,17 @@ def verify_flip(flip: dict, branch: str, n: int) -> dict:
|
||||
"target_url": target_url,
|
||||
"samples": ["[log unavailable; status itself is " + state + "]"],
|
||||
})
|
||||
elif state == "success":
|
||||
# Fail-closed: unreadable log on a success status is a
|
||||
# potential Quirk #10 mask (continue-on-error hiding real
|
||||
# failures). We cannot verify it's clean, so treat as
|
||||
# masked rather than allowing the flip.
|
||||
result["masked_runs"].append({
|
||||
"sha": sha,
|
||||
"status": state,
|
||||
"target_url": target_url,
|
||||
"samples": ["[log unavailable; cannot verify status is genuine — treat as masked]"],
|
||||
})
|
||||
break
|
||||
samples = grep_fail_markers(log_text)
|
||||
if state in ("failure", "error"):
|
||||
@@ -605,10 +624,12 @@ def verify_flip(flip: dict, branch: str, n: int) -> dict:
|
||||
break
|
||||
|
||||
if result["checked_commits"] == 0:
|
||||
result["warnings"].append(
|
||||
f"no runs of {target_context!r} found in the last {n} commits on "
|
||||
f"{branch} — cannot verify; allowing flip with warning"
|
||||
)
|
||||
result["masked_runs"].append({
|
||||
"sha": "",
|
||||
"status": "unverified",
|
||||
"target_url": "",
|
||||
"samples": [f"no runs of {target_context!r} found in the last {n} commits on {branch} — cannot verify flip"],
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
|
||||
@@ -197,19 +197,15 @@ if [ "$HTTP_CODE" != "200" ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Filter: state=APPROVED, not-dismissed, non-author. Optionally strict-mode
|
||||
# adds commit_id==head.sha (off by default; see header).
|
||||
# Filter: state=APPROVED, official=true, not-dismissed, non-author,
|
||||
# commit_id matches current PR head. All conditions are mandatory.
|
||||
JQ_FILTER='.[]
|
||||
| select(.state == "APPROVED")
|
||||
| select(.official == true)
|
||||
| select(.dismissed != true)
|
||||
| select(.official != false)
|
||||
| select(.user.login != $author)'
|
||||
if [ "${REVIEW_CHECK_STRICT:-}" = "1" ]; then
|
||||
JQ_FILTER="${JQ_FILTER}
|
||||
| select(.commit_id == \$head)"
|
||||
fi
|
||||
JQ_FILTER="${JQ_FILTER}
|
||||
| .user.login"
|
||||
| select(.user.login != $author)
|
||||
| select(.commit_id == $head)
|
||||
| .user.login'
|
||||
|
||||
REVIEW_CANDIDATES=$(jq -r --arg author "$PR_AUTHOR" --arg head "$PR_HEAD_SHA" "$JQ_FILTER" "$REVIEWS_JSON" | sort -u)
|
||||
debug "candidate non-author approvers: $(echo "$REVIEW_CANDIDATES" | tr '\n' ' ')"
|
||||
@@ -241,49 +237,14 @@ if [ -z "$REVIEW_CANDIDATES" ]; then
|
||||
|
||||
fi
|
||||
|
||||
# --- Fallback/extension (internal#348): check issue comments for agent-approval ---
|
||||
# core-qa-agent and core-security-agent can approve via issue comments. Always
|
||||
# include comment candidates, even if the reviews API returned approvals for a
|
||||
# different team; team membership below is the authoritative filter.
|
||||
COMMENT_CANDIDATES=""
|
||||
AGENT_PATTERN=""
|
||||
case "$TEAM" in
|
||||
qa) AGENT_PATTERN="\\[core-qa-agent\\]" ;;
|
||||
security) AGENT_PATTERN="\\[core-security-agent\\]" ;;
|
||||
esac
|
||||
HTTP_CODE=$(curl -sS -o "$COMMENTS_JSON" -w '%{http_code}' \
|
||||
-K "$CURL_AUTH_FILE" "${API}/repos/${OWNER}/${NAME}/issues/${PR_NUMBER}/comments")
|
||||
debug "GET /issues/${PR_NUMBER}/comments → HTTP ${HTTP_CODE}"
|
||||
if [ "$HTTP_CODE" = "200" ]; then
|
||||
# JQ expression: select non-author comments that match either the
|
||||
# agent-prefix pattern (case-insensitive) OR a generic approval keyword.
|
||||
JQ_APPROVALS='
|
||||
.[] |
|
||||
select(.user.login != $author) |
|
||||
. as $cmt |
|
||||
if ($agent_pattern | length) > 0 and ($cmt.body // "" | test($agent_pattern; "i")) then
|
||||
$cmt.user.login
|
||||
elif ($cmt.body // "" | test("\\b(APPROVED|LGTM|ACCEPTED)\\b"; "i")) then
|
||||
$cmt.user.login
|
||||
else
|
||||
empty
|
||||
end
|
||||
'
|
||||
COMMENT_CANDIDATES=$(jq -r \
|
||||
--arg author "$PR_AUTHOR" \
|
||||
--arg agent_pattern "$AGENT_PATTERN" \
|
||||
"$JQ_APPROVALS" \
|
||||
"$COMMENTS_JSON" 2>/dev/null | sort -u)
|
||||
debug "comment-based approval candidates: $(echo "$COMMENT_CANDIDATES" | tr '\n' ' ')"
|
||||
|
||||
if [ -n "$COMMENT_CANDIDATES" ]; then
|
||||
echo "::notice::${TEAM}-review: found $(echo "$COMMENT_CANDIDATES" | wc -w | xargs) comment-based approval candidate(s) — verifying team membership..."
|
||||
fi
|
||||
else
|
||||
debug "could not fetch issue comments (HTTP ${HTTP_CODE})"
|
||||
fi
|
||||
|
||||
CANDIDATES=$(printf '%s\n%s\n' "$REVIEW_CANDIDATES" "$COMMENT_CANDIDATES" | sed '/^$/d' | sort -u)
|
||||
# --- COMMENT APPROVAL REMOVED (security hardening) ---
|
||||
# Previous versions accepted issue comments containing generic approval
|
||||
# keywords (APPROVED/LGTM/ACCEPTED) or agent prefixes ([core-qa-agent],
|
||||
# [core-security-agent]) as satisfying the gate. Both paths are bypasses:
|
||||
# a comment lacks the audit trail, dismissal, stale-review invalidation,
|
||||
# and commit_id binding that an official Gitea review provides.
|
||||
# Only APPROVED reviews from the Gitea reviews API count.
|
||||
CANDIDATES="$REVIEW_CANDIDATES"
|
||||
|
||||
if [ -z "${CANDIDATES:-}" ]; then
|
||||
echo "::error::${TEAM}-review awaiting non-author APPROVE from ${TEAM} team (no candidates from reviews API or issue comments)"
|
||||
|
||||
@@ -48,7 +48,6 @@ set -euo pipefail
|
||||
# workflow-level jq install can fail on runners with network restrictions
|
||||
# (GitHub releases not reachable from some runner networks — infra#241
|
||||
# follow-up). This fallback is idempotent — no-op when jq is already on PATH.
|
||||
# SOP_FAIL_OPEN=1 makes this always exit 0 so CI never blocks on jq absence.
|
||||
if ! command -v jq >/dev/null 2>&1; then
|
||||
echo "::notice::jq not found on PATH — attempting install..."
|
||||
_jq_installed="no"
|
||||
@@ -67,12 +66,6 @@ if ! command -v jq >/dev/null 2>&1; then
|
||||
if ! command -v jq >/dev/null 2>&1; then
|
||||
echo "::error::jq installation failed — apt-get and GitHub binary both failed."
|
||||
echo "::error::sop-tier-check requires jq for all JSON API parsing."
|
||||
# SOP_FAIL_OPEN=1 is set in the workflow step's env — makes script always
|
||||
# exit 0 so CI never blocks. The SOP-6 tier review gate remains enforced.
|
||||
if [ "${SOP_FAIL_OPEN:-}" = "1" ]; then
|
||||
echo "::warning::SOP_FAIL_OPEN=1 — exiting 0 so CI does not block."
|
||||
exit 0
|
||||
fi
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
@@ -101,15 +94,10 @@ echo "::notice::tier-check start: repo=$OWNER/$NAME pr=$PR_NUMBER author=$PR_AUT
|
||||
# cause the script to exit prematurely when the token is empty/invalid — the
|
||||
# if check below handles that case gracefully. Without || true, a 401 from an
|
||||
# empty/invalid token causes jq to exit 1, triggering set -e and exiting the
|
||||
# entire script before SOP_FAIL_OPEN can be evaluated (the check is in the jq-
|
||||
# install block; if jq is already on PATH, that block is skipped entirely).
|
||||
# entire script before the error can be logged.
|
||||
WHOAMI=$(curl -sS -H "$AUTH" "${API}/user" | jq -r '.login // ""') || true
|
||||
if [ -z "$WHOAMI" ]; then
|
||||
echo "::error::GITEA_TOKEN cannot resolve a user via /api/v1/user — check the token scope and that the secret is wired correctly."
|
||||
if [ "${SOP_FAIL_OPEN:-}" = "1" ]; then
|
||||
echo "::warning::SOP_FAIL_OPEN=1 — exiting 0 so CI does not block."
|
||||
exit 0
|
||||
fi
|
||||
exit 1
|
||||
fi
|
||||
echo "::notice::token resolves to user: $WHOAMI"
|
||||
@@ -119,10 +107,6 @@ echo "::notice::token resolves to user: $WHOAMI"
|
||||
HEAD_SHA=$(curl -sS -H "$AUTH" "${API}/repos/${OWNER}/${NAME}/pulls/${PR_NUMBER}" | jq -r '.head.sha // ""') || true
|
||||
if [ -z "$HEAD_SHA" ]; then
|
||||
echo "::error::Failed to fetch PR head SHA — token may be invalid."
|
||||
if [ "${SOP_FAIL_OPEN:-}" = "1" ]; then
|
||||
echo "::warning::SOP_FAIL_OPEN=1 — exiting 0 so CI does not block."
|
||||
exit 0
|
||||
fi
|
||||
exit 1
|
||||
fi
|
||||
debug "pr-head-sha=$HEAD_SHA"
|
||||
@@ -160,18 +144,14 @@ debug "tier=$TIER"
|
||||
# as unachievable (would always fail) — operators notice the clear error
|
||||
# and create the missing team.
|
||||
#
|
||||
# Current Gitea teams: ceo, engineers, managers
|
||||
# Future teams (create before removing "???" fallback): qa, security, security-audit
|
||||
# Current Gitea teams: ceo, engineers, managers, qa, security
|
||||
declare -A TIER_EXPR=(
|
||||
# tier:low — same as previous OR gate: any engineer, manager, or ceo.
|
||||
["tier:low"]="engineers,managers,ceo"
|
||||
|
||||
# tier:medium — AND of (managers) AND (engineers) AND (qa???,security???)
|
||||
# The qa+security clause requires both teams to exist; when not yet
|
||||
# created, the PR author is responsible for adding them before requesting
|
||||
# approval on a tier:medium PR. Ops: create qa + security Gitea teams
|
||||
# and update this map to remove the "???" markers (internal#189 follow-up).
|
||||
["tier:medium"]="managers AND engineers AND qa???,security???"
|
||||
# tier:medium — AND of (managers) AND (engineers) AND (qa,security)
|
||||
# ≥1 approver from managers AND ≥1 from engineers AND ≥1 from qa OR security.
|
||||
["tier:medium"]="managers AND engineers AND qa,security"
|
||||
|
||||
# tier:high — ceo only. The AND-composition adds no value for a
|
||||
# single-team gate, but the framework is wired for consistency.
|
||||
@@ -215,10 +195,6 @@ if [ "${SOP_DEBUG:-}" = "1" ]; then
|
||||
fi
|
||||
if [ "$_HTTP_EXIT" -ne 0 ] || [ "$HTTP_CODE" != "200" ]; then
|
||||
echo "::error::GET /orgs/${OWNER}/teams failed (curl exit=$_HTTP_EXIT HTTP=$HTTP_CODE) — token may lack read:org scope or be invalid."
|
||||
if [ "${SOP_FAIL_OPEN:-}" = "1" ]; then
|
||||
echo "::warning::SOP_FAIL_OPEN=1 — exiting 0 so CI does not block."
|
||||
exit 0
|
||||
fi
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -265,17 +241,13 @@ done
|
||||
|
||||
# 5. Read approving reviewers. set +e disables set -e temporarily so that curl
|
||||
# failures (e.g. empty/invalid token → HTTP 401) do not abort the script before
|
||||
# SOP_FAIL_OPEN is evaluated. set -e is restored immediately after.
|
||||
# set -e is restored immediately after.
|
||||
set +e
|
||||
REVIEWS=$(curl -sS -H "$AUTH" "${API}/repos/${OWNER}/${NAME}/pulls/${PR_NUMBER}/reviews")
|
||||
_REVIEWS_EXIT=$?
|
||||
set -e
|
||||
if [ $_REVIEWS_EXIT -ne 0 ] || [ -z "$REVIEWS" ]; then
|
||||
echo "::error::Failed to fetch reviews (curl exit=$_REVIEWS_EXIT) — token may be invalid or unreachable."
|
||||
if [ "${SOP_FAIL_OPEN:-}" = "1" ]; then
|
||||
echo "::warning::SOP_FAIL_OPEN=1 — exiting 0 so CI does not block."
|
||||
exit 0
|
||||
fi
|
||||
exit 1
|
||||
fi
|
||||
APPROVERS=$(echo "$REVIEWS" | jq -r --arg head_sha "$HEAD_SHA" '[.[] | select(.state=="APPROVED" and .commit_id == $head_sha) | .user.login] | unique | .[]') || true
|
||||
|
||||
@@ -689,8 +689,8 @@ def reap_branch(
|
||||
shas = list_recent_commit_shas(branch, limit)
|
||||
except ApiError as e:
|
||||
print(
|
||||
"::warning::status-reaper skipped this tick because the "
|
||||
f"commit list could not be read after retries: {e}"
|
||||
"::error::status-reaper cannot run: commit-list API failed "
|
||||
f"after retries: {e}"
|
||||
)
|
||||
return {
|
||||
"scanned_shas": 0,
|
||||
@@ -704,6 +704,7 @@ def reap_branch(
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_per_sha": {},
|
||||
"sha_api_errors": 0,
|
||||
"skipped": True,
|
||||
"skip_reason": "commit-list-api-error",
|
||||
}
|
||||
@@ -720,6 +721,7 @@ def reap_branch(
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_per_sha": {},
|
||||
"sha_api_errors": 0,
|
||||
}
|
||||
|
||||
for sha in shas:
|
||||
@@ -731,8 +733,9 @@ def reap_branch(
|
||||
try:
|
||||
combined = get_combined_status(sha)
|
||||
except ApiError as e:
|
||||
aggregate["sha_api_errors"] += 1
|
||||
print(
|
||||
f"::warning::get_combined_status({sha[:10]}) failed; "
|
||||
f"::error::get_combined_status({sha[:10]}) failed; "
|
||||
f"skipping this SHA: {e}"
|
||||
)
|
||||
continue
|
||||
@@ -819,6 +822,14 @@ def main() -> int:
|
||||
sort_keys=True,
|
||||
)
|
||||
)
|
||||
# Observability: infra-failure → red. If the commit list could not be
|
||||
# read or any per-SHA status fetch failed, the tick is incomplete and
|
||||
# must be observable as a failure (non-zero exit) so the cron bot or
|
||||
# runner surface alerts.
|
||||
if counters.get("skipped"):
|
||||
return 1
|
||||
if counters.get("sha_api_errors", 0) > 0:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -109,23 +109,34 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
return self._json(200, [{
|
||||
"state": "APPROVED",
|
||||
"dismissed": True,
|
||||
"official": True,
|
||||
"user": {"login": "core-devops"},
|
||||
"commit_id": "abc1234",
|
||||
"commit_id": "deadbeef0000111122223333444455556666",
|
||||
}])
|
||||
if sc == "T3_reviews_approved_non_author":
|
||||
return self._json(200, [
|
||||
{"state": "CHANGES_REQUESTED", "dismissed": False, "user": {"login": "bob"}, "commit_id": "abc1234"},
|
||||
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "abc1234"},
|
||||
{"state": "CHANGES_REQUESTED", "dismissed": False, "official": True, "user": {"login": "bob"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
{"state": "APPROVED", "dismissed": False, "official": True, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
])
|
||||
if sc == "T19_ai_sop_ack_approved":
|
||||
# ai-sop-ack member submitted APPROVED review — must NOT count
|
||||
# toward qa-review (team_id=20) or security-review (team_id=21).
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "dismissed": False, "user": {"login": "ai-reviewer"}, "commit_id": "abc1234"},
|
||||
{"state": "APPROVED", "dismissed": False, "official": True, "user": {"login": "ai-reviewer"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
])
|
||||
# Default: one non-author APPROVED
|
||||
if sc == "T21_stale_head_approved":
|
||||
# APPROVED review but on an old commit (stale head) → must be rejected
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "dismissed": False, "official": True, "user": {"login": "core-devops"}, "commit_id": "oldsha0000000000000000000000000000"},
|
||||
])
|
||||
if sc == "T22_missing_official":
|
||||
# APPROVED review with no official field → must be rejected
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
])
|
||||
# Default: one non-author APPROVED (current head, official)
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "abc1234"},
|
||||
{"state": "APPROVED", "dismissed": False, "official": True, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
])
|
||||
|
||||
# GET /repos/{owner}/{name}/issues/{pr_number}/comments
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -320,10 +320,10 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
self.assertEqual(len(verdict["fail_runs"]), 1)
|
||||
self.assertEqual(verdict["fail_runs"][0]["status"], "failure")
|
||||
|
||||
def test_unreadable_log_warns_not_blocks(self):
|
||||
# Acceptance test #5: log fetch 404 (None) → warn, not block.
|
||||
# Status is `success`, log is None — we can't tell, so we warn
|
||||
# and allow.
|
||||
def test_unreadable_log_on_success_blocks(self):
|
||||
# Fail-closed: log fetch 404 (None) on a success status is a
|
||||
# potential Quirk #10 mask — we cannot verify it's genuine, so
|
||||
# we block the flip rather than allowing it.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=["sha1"]):
|
||||
with mock.patch.object(
|
||||
lpfc, "combined_status",
|
||||
@@ -332,7 +332,8 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
with mock.patch.object(lpfc, "fetch_log", return_value=None):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
self.assertEqual(verdict["masked_runs"], [])
|
||||
self.assertEqual(len(verdict["masked_runs"]), 1)
|
||||
self.assertIn("log unavailable", verdict["masked_runs"][0]["samples"][0])
|
||||
self.assertTrue(any("log unavailable" in w for w in verdict["warnings"]))
|
||||
|
||||
def test_unreadable_log_with_failure_status_still_blocks(self):
|
||||
@@ -349,9 +350,9 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
self.assertEqual(len(verdict["fail_runs"]), 1)
|
||||
self.assertIn("log unavailable", verdict["fail_runs"][0]["samples"][0])
|
||||
|
||||
def test_zero_runs_history_warns_allows(self):
|
||||
# No commits with a matching context — newly added workflow.
|
||||
# Allow with warning.
|
||||
def test_zero_runs_history_blocks(self):
|
||||
# No commits with a matching context — cannot verify the flip.
|
||||
# Fail-closed: treat as masked rather than allowing.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=["sha1", "sha2"]):
|
||||
with mock.patch.object(
|
||||
lpfc, "combined_status",
|
||||
@@ -360,17 +361,32 @@ class TestVerifyFlip(unittest.TestCase):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["checked_commits"], 0)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
self.assertEqual(verdict["masked_runs"], [])
|
||||
self.assertTrue(any("no runs of" in w for w in verdict["warnings"]))
|
||||
self.assertEqual(len(verdict["masked_runs"]), 1)
|
||||
self.assertIn("cannot verify flip", verdict["masked_runs"][0]["samples"][0])
|
||||
|
||||
def test_zero_commits_warns_allows(self):
|
||||
# Empty branch (newly created repo, e.g.). Allow with warning.
|
||||
def test_zero_commits_blocks(self):
|
||||
# Empty branch (newly created repo, e.g.). Fail-closed: block.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=[]):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["checked_commits"], 0)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
self.assertEqual(verdict["masked_runs"], [])
|
||||
self.assertTrue(any("no recent commits" in w for w in verdict["warnings"]))
|
||||
self.assertEqual(len(verdict["masked_runs"]), 1)
|
||||
self.assertIn("cannot verify flip", verdict["masked_runs"][0]["samples"][0])
|
||||
|
||||
def test_combined_status_api_error_blocks(self):
|
||||
# Fail-closed: combined_status ApiError means the check history is
|
||||
# unreadable — we cannot verify the flip, so block as masked.
|
||||
with mock.patch.object(lpfc, "recent_commits_on_branch", return_value=["sha1"]):
|
||||
with mock.patch.object(
|
||||
lpfc, "combined_status",
|
||||
side_effect=lpfc.ApiError("GET /statuses/sha → HTTP 500"),
|
||||
):
|
||||
verdict = lpfc.verify_flip(FLIP_FIXTURE, "main", 5)
|
||||
self.assertEqual(verdict["checked_commits"], 0)
|
||||
self.assertEqual(verdict["fail_runs"], [])
|
||||
# One masked_run from the ApiError, one from zero checked_commits.
|
||||
self.assertEqual(len(verdict["masked_runs"]), 2)
|
||||
self.assertIn("API error", verdict["masked_runs"][0]["samples"][0])
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
@@ -14,10 +14,17 @@
|
||||
# T9 — team membership probe → 403 (token not in team) → script exits 1 (fail closed)
|
||||
# T10 — CURL_AUTH_FILE created with mode 600 and correct header content
|
||||
# T11 — bash syntax check (bash -n passes)
|
||||
# T12 — jq filter: non-author APPROVED → in candidate list; dismissed → excluded
|
||||
# T12 — jq filter: non-author APPROVED official current-head → in candidate list; dismissed → excluded
|
||||
# T13 — missing required env GITEA_TOKEN → exits 1 with error
|
||||
# T14 — non-default-base PR exits 0 without requiring review
|
||||
# T18 — wrong-team review candidate does not block right-team comment approval
|
||||
# T15 — comment agent-prefix approval → exit 1
|
||||
# T16 — comment generic keyword approval → exit 1
|
||||
# T17 — comments with no approval keywords → exit 1
|
||||
# T18 — wrong-team review + right-team comment → exit 1
|
||||
# T19 — ai-sop-ack APPROVED review excluded from qa-review gate
|
||||
# T20 — ai-sop-ack APPROVED review excluded from security-review gate
|
||||
# T21 — stale-head APPROVED review → exit 1 (commit_id mismatch)
|
||||
# T22 — missing/non-official APPROVED review → exit 1 (official != true)
|
||||
#
|
||||
# Hostile-self-review (per feedback_assert_exact_not_substring):
|
||||
# this test MUST FAIL if the script is absent. Verified by running
|
||||
@@ -319,41 +326,50 @@ assert_file_contains "T10b printf header format (CURL_AUTH_FILE content)" "$T10_
|
||||
assert_file_contains "T10c 'header =' curl-config syntax" "$T10_AUTHFILE" 'header = "Authorization: token '
|
||||
rm -f "$T10_AUTHFILE"
|
||||
|
||||
# T12 — jq filter: non-author APPROVED included, dismissed excluded
|
||||
# T12 — jq filter: non-author APPROVED official current-head included; dismissed/stale/missing-official excluded
|
||||
echo
|
||||
echo "== T12 jq filter =="
|
||||
# These are tested indirectly via T3 and T6 above, but let's also test
|
||||
# the jq expression directly.
|
||||
JQ_FILTER='.[]
|
||||
| select(.state == "APPROVED")
|
||||
| select(.official == true)
|
||||
| select(.dismissed != true)
|
||||
| select(.user.login != "alice")
|
||||
| select(.commit_id == $head)
|
||||
| .user.login'
|
||||
|
||||
T12_INPUT='[{"state":"APPROVED","dismissed":false,"user":{"login":"core-devops"}},{"state":"CHANGES_REQUESTED","dismissed":false,"user":{"login":"bob"}},{"state":"APPROVED","dismissed":false,"user":{"login":"alice"}},{"state":"APPROVED","dismissed":true,"user":{"login":"carol"}}]'
|
||||
T12_INPUT='[{"state":"APPROVED","official":true,"dismissed":false,"commit_id":"deadbeef0000111122223333444455556666","user":{"login":"core-devops"}},{"state":"CHANGES_REQUESTED","official":true,"dismissed":false,"commit_id":"deadbeef0000111122223333444455556666","user":{"login":"bob"}},{"state":"APPROVED","official":true,"dismissed":false,"commit_id":"deadbeef0000111122223333444455556666","user":{"login":"alice"}},{"state":"APPROVED","official":true,"dismissed":true,"commit_id":"deadbeef0000111122223333444455556666","user":{"login":"carol"}},{"state":"APPROVED","official":false,"dismissed":false,"commit_id":"deadbeef0000111122223333444455556666","user":{"login":"dave"}},{"state":"APPROVED","official":true,"dismissed":false,"commit_id":"oldsha0000000000000000000000000000","user":{"login":"eve"}}]'
|
||||
|
||||
JQ_CMD=$(command -v jq 2>/dev/null || echo /tmp/jq)
|
||||
T12_CANDIDATES=$(echo "$T12_INPUT" | "$JQ_CMD" -r "$JQ_FILTER" 2>/dev/null | sort -u)
|
||||
assert_contains "T12 jq: core-devops (non-author APPROVED) in candidates" "core-devops" "$T12_CANDIDATES"
|
||||
T12_CANDIDATES=$(echo "$T12_INPUT" | "$JQ_CMD" -r --arg head "deadbeef0000111122223333444455556666" "$JQ_FILTER" 2>/dev/null | sort -u)
|
||||
assert_contains "T12 jq: core-devops (non-author APPROVED official current-head) in candidates" "core-devops" "$T12_CANDIDATES"
|
||||
assert_eq "T12 jq: alice (author) NOT in candidates" "" "$(echo "$T12_CANDIDATES" | grep '^alice$' || true)"
|
||||
assert_eq "T12 jq: carol (dismissed) NOT in candidates" "" "$(echo "$T12_CANDIDATES" | grep '^carol$' || true)"
|
||||
assert_eq "T12 jq: dave (official=false) NOT in candidates" "" "$(echo "$T12_CANDIDATES" | grep '^dave$' || true)"
|
||||
assert_eq "T12 jq: eve (stale head) NOT in candidates" "" "$(echo "$T12_CANDIDATES" | grep '^eve$' || true)"
|
||||
|
||||
# T15 — comment-based approval via agent prefix pattern → exit 0
|
||||
# T15 — comment-based approval via agent prefix pattern → exit 1
|
||||
# SECURITY: agent-prefix comments are also removed. A text prefix in an
|
||||
# issue comment is spoofable (any team member can type "[core-qa-agent]")
|
||||
# and lacks the audit trail of an official Gitea review.
|
||||
echo
|
||||
echo "== T15 comment agent-prefix approval =="
|
||||
T15_OUT=$(run_review_check "T15_comments_agent_approval")
|
||||
T15_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T15 exit code 0 (agent-comment approval + team member)" "0" "$T15_RC"
|
||||
assert_contains "T15 comment fallback notice" "comment-based approval" "$T15_OUT"
|
||||
assert_contains "T15 core-qa-agent APPROVED" "APPROVED by core-qa-agent" "$T15_OUT"
|
||||
assert_eq "T15 exit code 1 (agent-prefix comment rejected — not an official review)" "1" "$T15_RC"
|
||||
assert_contains "T15 no candidates error" "no candidates from reviews API or issue comments" "$T15_OUT"
|
||||
|
||||
# T16 — comment-based approval via generic APPROVED keyword → exit 0
|
||||
# T16 — comment-based approval via generic APPROVED keyword → exit 1
|
||||
# SECURITY: generic keywords (APPROVED/LGTM/ACCEPTED) must NOT satisfy the
|
||||
# gate — only official Gitea reviews or agent-prefix comments count. A plain
|
||||
# comment from a team member is a bypass if it skips the review UI.
|
||||
echo
|
||||
echo "== T16 comment generic keyword approval =="
|
||||
T16_OUT=$(run_review_check "T16_comments_generic_approval")
|
||||
T16_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T16 exit code 0 (generic-approval comment + team member)" "0" "$T16_RC"
|
||||
assert_contains "T16 comment fallback notice" "comment-based approval" "$T16_OUT"
|
||||
assert_eq "T16 exit code 1 (generic-approval comment rejected — not an official review)" "1" "$T16_RC"
|
||||
assert_contains "T16 no candidates error" "no candidates from reviews API or issue comments" "$T16_OUT"
|
||||
|
||||
# T17 — no approval keywords in comments → exit 1
|
||||
echo
|
||||
@@ -363,16 +379,16 @@ T17_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T17 exit code 1 (no candidates from comments)" "1" "$T17_RC"
|
||||
assert_contains "T17 no candidates error" "no candidates from reviews API or issue comments" "$T17_OUT"
|
||||
|
||||
# T18 — a wrong-team PR review candidate must not suppress a right-team
|
||||
# comment approval. This matches PR #1790, where QA had an APPROVED review
|
||||
# and security approved via the agent comment convention.
|
||||
# T18 — wrong-team review + right-team comment → exit 1
|
||||
# SECURITY: with comment approval fully removed, a wrong-team review plus
|
||||
# a right-team comment yields NO valid candidates. Only official reviews
|
||||
# from the target team count.
|
||||
echo
|
||||
echo "== T18 review candidate wrong team, comment candidate right team =="
|
||||
T18_OUT=$(run_review_check "T18_review_wrong_team_comment_right_team")
|
||||
T18_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T18 exit code 0 (comment approval still considered)" "0" "$T18_RC"
|
||||
assert_contains "T18 comment candidate notice" "comment-based approval" "$T18_OUT"
|
||||
assert_contains "T18 comment approver accepted" "APPROVED by core-qa-agent" "$T18_OUT"
|
||||
assert_eq "T18 exit code 1 (comment approval removed — no valid candidates)" "1" "$T18_RC"
|
||||
assert_contains "T18 none are in team" "none are in team" "$T18_OUT"
|
||||
|
||||
# T19 — ai-sop-ack member APPROVED review must NOT count toward qa-review
|
||||
# or security-review (R1 hardening refinement, msg 1388c76f).
|
||||
@@ -393,6 +409,24 @@ assert_eq "T20 exit code 1 (ai-sop-ack not in security team)" "1" "$T20_RC"
|
||||
assert_contains "T20 ai-reviewer excluded from security" "candidates: ai-reviewer" "$T20_OUT"
|
||||
assert_contains "T20 none are in security team" "none are in team" "$T20_OUT"
|
||||
|
||||
# T21 — stale-head APPROVED review must be rejected (commit_id mismatch).
|
||||
# SECURITY: an approval on an old commit does not cover the current head.
|
||||
echo
|
||||
echo "== T21 stale-head APPROVED review rejected =="
|
||||
T21_OUT=$(run_review_check "T21_stale_head_approved")
|
||||
T21_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T21 exit code 1 (stale-head approval rejected)" "1" "$T21_RC"
|
||||
assert_contains "T21 no candidates error" "no candidates from reviews API or issue comments" "$T21_OUT"
|
||||
|
||||
# T22 — missing/non-official APPROVED review must be rejected.
|
||||
# SECURITY: only official Gitea reviews count; comments and non-official reviews lack audit trail.
|
||||
echo
|
||||
echo "== T22 missing official flag APPROVED review rejected =="
|
||||
T22_OUT=$(run_review_check "T22_missing_official")
|
||||
T22_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T22 exit code 1 (missing official rejected)" "1" "$T22_RC"
|
||||
assert_contains "T22 no candidates error" "no candidates from reviews API or issue comments" "$T22_OUT"
|
||||
|
||||
echo
|
||||
echo "------"
|
||||
echo "PASS=$PASS FAIL=$FAIL"
|
||||
|
||||
@@ -57,12 +57,12 @@ echo "test: tier:low OR-clause splits to 3 tokens"
|
||||
assert_eq "tier:low" "engineers|managers|ceo" "$(split_clause "engineers,managers,ceo")"
|
||||
|
||||
echo "test: tier:medium AND-expression — bash word-split on \$EXPR yields 5 tokens"
|
||||
EXPR="managers AND engineers AND qa???,security???"
|
||||
EXPR="managers AND engineers AND qa,security"
|
||||
out=""
|
||||
for _raw in $EXPR; do
|
||||
out="${out}${out:+ ; }$(split_clause "$_raw")"
|
||||
done
|
||||
assert_eq "tier:medium" "managers ; AND ; engineers ; AND ; qa???|security???" "$out"
|
||||
assert_eq "tier:medium" "managers ; AND ; engineers ; AND ; qa|security" "$out"
|
||||
|
||||
echo "test: tier:high single-team OR-clause"
|
||||
assert_eq "tier:high" "ceo" "$(split_clause "ceo")"
|
||||
|
||||
@@ -418,10 +418,9 @@ jobs:
|
||||
# a manual action that determinism made obsolete.
|
||||
name: Canvas Deploy Status
|
||||
runs-on: docker-host
|
||||
# Job-level `if:` so ci-required-drift.py's ci_job_names() detects this as
|
||||
# github.ref-gated and skips it from the required-context F1 set (mc#1982).
|
||||
# Per-step no-op (not job-level `if:`) so the job reaches SUCCESS on PRs
|
||||
# instead of skipped — skipped poisons the PR combined status (internal#817).
|
||||
# Step-level exit 0 handles the "not a canvas main push" case.
|
||||
if: ${{ github.ref == 'refs/heads/main' || github.ref == 'refs/heads/staging' }}
|
||||
needs: [changes, canvas-build]
|
||||
steps:
|
||||
- name: Record canvas ordered-deploy status
|
||||
@@ -533,9 +532,8 @@ jobs:
|
||||
# The `needs:` list MUST stay in lockstep with ci-required-drift.py's
|
||||
# F1 check (`ci_job_names()` = every job MINUS the sentinel MINUS jobs
|
||||
# whose `if:` gates on github.event_name/github.ref). canvas-deploy-
|
||||
# reminder is event-gated (`if: github.ref == refs/heads/{main,staging}`)
|
||||
# so it is intentionally EXCLUDED — it skips on PRs and a `needs:` on a
|
||||
# skipped job would never let the sentinel run. If a new always-running
|
||||
# status is per-step-gated (not job-level `if:`) so it reaches SUCCESS
|
||||
# on PRs and is included here — internal#817. If a new always-running
|
||||
# CI job is added, add it here too or ci-required-drift F1 will flag it.
|
||||
#
|
||||
# Stays on the dedicated `ci-meta` lane (no docker work, so the
|
||||
@@ -549,6 +547,7 @@ jobs:
|
||||
- canvas-build
|
||||
- shellcheck
|
||||
- python-lint
|
||||
- canvas-deploy-status
|
||||
continue-on-error: false
|
||||
runs-on: ci-meta
|
||||
timeout-minutes: 5
|
||||
@@ -567,6 +566,7 @@ jobs:
|
||||
CANVAS_RESULT: ${{ needs.canvas-build.result }}
|
||||
SHELLCHECK_RESULT: ${{ needs.shellcheck.result }}
|
||||
PYTHON_LINT_RESULT: ${{ needs.python-lint.result }}
|
||||
CANVAS_DEPLOY_RESULT: ${{ needs.canvas-deploy-status.result }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
fail=0
|
||||
@@ -588,6 +588,7 @@ jobs:
|
||||
check "Canvas (Next.js)" "$CANVAS_RESULT"
|
||||
check "Shellcheck (E2E scripts)" "$SHELLCHECK_RESULT"
|
||||
check "Python Lint & Test" "$PYTHON_LINT_RESULT"
|
||||
check "Canvas Deploy Status" "$CANVAS_DEPLOY_RESULT"
|
||||
if [ "$fail" -ne 0 ]; then
|
||||
echo "::error::all-required: one or more aggregated CI jobs did not succeed"
|
||||
exit 1
|
||||
|
||||
@@ -7,10 +7,13 @@ name: gitea-merge-queue
|
||||
# the user-space queue bot, one PR per tick, using the non-bypass merge actor.
|
||||
#
|
||||
# Queue contract:
|
||||
# - add label `merge-queue` to an open same-repo PR
|
||||
# - auto-discovery (default): any open same-repo PR is considered — no
|
||||
# `merge-queue` label required (the label is optional metadata now)
|
||||
# - bot updates stale PR heads with current main, then waits for CI
|
||||
# - bot merges only when current main is green and required PR contexts pass
|
||||
# - add `merge-queue-hold` to pause a queued PR without removing it
|
||||
# - bot merges only when current main is green, genuine approvals are present
|
||||
# on the current head, required PR contexts pass, and the PR is mergeable
|
||||
# - add `merge-queue-hold`, `do-not-auto-merge`, or `wip` to keep a PR OUT of
|
||||
# autonomous merging; draft PRs are also skipped
|
||||
|
||||
on:
|
||||
# Schedule moved to operator-config:
|
||||
@@ -48,6 +51,19 @@ jobs:
|
||||
WATCH_BRANCH: ${{ github.event.repository.default_branch }}
|
||||
QUEUE_LABEL: merge-queue
|
||||
HOLD_LABEL: merge-queue-hold
|
||||
# Auto-discovery (opt-OUT). When on (default), the queue considers ALL
|
||||
# open same-repo PRs that meet the merge bar — it does NOT wait for a
|
||||
# human/agent to add `merge-queue`. Agent Gitea tokens lack
|
||||
# write:issue (labels are issue-scoped) and could never self-label,
|
||||
# which stalled the queue; the label is now OPTIONAL metadata. The
|
||||
# merge bar is UNCHANGED — only candidate selection widens. Set
|
||||
# AUTO_DISCOVER=0 to restore legacy opt-IN (require the merge-queue
|
||||
# label to be considered).
|
||||
AUTO_DISCOVER: "1"
|
||||
# Opt-OUT labels: any of these on a PR keeps it OUT of autonomous
|
||||
# merging (the human escape hatch). HOLD_LABEL is always also honoured.
|
||||
# A human who wants a PR held just adds one of these labels.
|
||||
OPT_OUT_LABELS: do-not-auto-merge,wip
|
||||
UPDATE_STYLE: merge
|
||||
# Recognised official-reviewer set. A merge needs >= required_approvals
|
||||
# DISTINCT genuine official approvals from these accounts on the
|
||||
|
||||
@@ -61,11 +61,9 @@ name: Lint pre-flip continue-on-error
|
||||
# feedback_no_shared_persona_token_use.
|
||||
#
|
||||
# Phase contract (RFC internal#219 §1 ladder):
|
||||
# - This workflow lands at `continue-on-error: true` (Phase 3 —
|
||||
# surface defects without blocking). Follow-up PR flips it to
|
||||
# `false` ONLY after this workflow's own recent runs on `main`
|
||||
# are confirmed clean — exactly the discipline the workflow
|
||||
# itself enforces. Eat your own dogfood.
|
||||
# - Flipped to `continue-on-error: false` after Researcher live-verified
|
||||
# clean runs. The script's own 35 pytest tests pass and recent PR
|
||||
# history shows no masked regressions — the gate is now enforcing.
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
@@ -97,10 +95,9 @@ jobs:
|
||||
name: Verify continue-on-error flips have run-log proof
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 8
|
||||
# Phase 3 (RFC internal#219 §1): surface broken flips without blocking
|
||||
# the PR yet. Follow-up flips this to `false` once the workflow itself
|
||||
# has clean recent runs on main. mc#1982 interim — remove when CoE→false.
|
||||
continue-on-error: true # mc#1982
|
||||
# Fail-closed: the lint script is verified clean (35/35 tests pass,
|
||||
# Researcher live-check confirmed). Masking removed per mc#1982 close-out.
|
||||
continue-on-error: false
|
||||
steps:
|
||||
- name: Check out PR head (full history for base-SHA access)
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
+51
-31
@@ -234,30 +234,44 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
"Authorization": `Bearer ${tenantToken}`,
|
||||
"X-Molecule-Org-Id": orgID,
|
||||
};
|
||||
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
|
||||
method: "POST",
|
||||
headers: tenantAuth,
|
||||
body: JSON.stringify({
|
||||
name: "E2E Canvas Test",
|
||||
runtime: "hermes",
|
||||
tier: 2,
|
||||
// Provider-registry SSOT (internal#718) registers ONLY Kimi models for
|
||||
// the hermes runtime — `moonshot/kimi-k2.6` is the platform-managed
|
||||
// entry (workspace-server/internal/providers/providers.yaml, hermes ->
|
||||
// platform). The old `gpt-4o` was never a registered hermes model and
|
||||
// now 422s UNREGISTERED_MODEL_FOR_RUNTIME (core#2225). This workspace
|
||||
// defaults closed to platform_managed (see the boot-shape note below),
|
||||
// so a platform-namespaced model id is the registry-correct choice.
|
||||
model: "moonshot/kimi-k2.6",
|
||||
}),
|
||||
});
|
||||
if (ws.status >= 400 || !ws.body?.id) {
|
||||
throw new Error(`Workspace create ${ws.status}: ${JSON.stringify(ws.body)}`);
|
||||
// Retry workspace creation on transient 5xx / timeout — staging CP can
|
||||
// return 502/503/504 under load and a single-shot failure kills the
|
||||
// entire E2E run. 3 attempts with 3s exponential backoff (3s, 6s, 12s)
|
||||
// gives ~21s total budget, well inside the 20-min provision envelope.
|
||||
let workspaceId = "";
|
||||
for (let attempt = 1; attempt <= 3; attempt++) {
|
||||
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
|
||||
method: "POST",
|
||||
headers: tenantAuth,
|
||||
body: JSON.stringify({
|
||||
name: "E2E Canvas Test",
|
||||
runtime: "hermes",
|
||||
tier: 2,
|
||||
// Provider-registry SSOT (internal#718) registers ONLY Kimi models for
|
||||
// the hermes runtime — `moonshot/kimi-k2.6` is the platform-managed
|
||||
// entry (workspace-server/internal/providers/providers.yaml, hermes ->
|
||||
// platform). The old `gpt-4o` was never a registered hermes model and
|
||||
// now 422s UNREGISTERED_MODEL_FOR_RUNTIME (core#2225). This workspace
|
||||
// defaults closed to platform_managed (see the boot-shape note below),
|
||||
// so a platform-namespaced model id is the registry-correct choice.
|
||||
model: "moonshot/kimi-k2.6",
|
||||
}),
|
||||
});
|
||||
if (ws.status >= 200 && ws.status < 300 && ws.body?.id) {
|
||||
workspaceId = ws.body.id as string;
|
||||
break;
|
||||
}
|
||||
const isTransient = ws.status >= 500 || ws.status === 0;
|
||||
if (!isTransient || attempt === 3) {
|
||||
throw new Error(`Workspace create ${ws.status} (attempt ${attempt}): ${JSON.stringify(ws.body)}`);
|
||||
}
|
||||
const backoff = 3000 * Math.pow(2, attempt - 1);
|
||||
console.log(`[staging-setup] Workspace create transient ${ws.status}, retrying in ${backoff}ms...`);
|
||||
await new Promise((r) => setTimeout(r, backoff));
|
||||
}
|
||||
const workspaceId = ws.body.id as string;
|
||||
console.log(`[staging-setup] Workspace created: ${workspaceId}`);
|
||||
|
||||
// 6. Wait for workspace RENDERABLE.
|
||||
// 6. Wait for workspace online
|
||||
//
|
||||
// This harness exists to verify the canvas *tab UI* renders (staging-
|
||||
// tabs.spec.ts: open each of the 13 workspace-panel tabs, assert no hard
|
||||
@@ -266,6 +280,16 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
// it needs is a workspace ROW that the canvas lists so the node renders
|
||||
// and the side-panel tabs open. A fully-`online` agent is NOT required.
|
||||
//
|
||||
// Hermes cold-boot takes 10-13 min on slow apt days (apt + uv + hermes
|
||||
// install + npm browser-tools). The controlplane bootstrap-watcher
|
||||
// deadline fires at 5 min and sets status=failed prematurely; heartbeat
|
||||
// then transitions failed → online after install.sh finishes. The ONLY
|
||||
// failed shape we tolerate is the pre-start credential-abort
|
||||
// (uptime_seconds=0, no last_sample_error) — the agent never ran. Real
|
||||
// boot regressions (image pull error, panic, PYTHONPATH, etc.) still
|
||||
// hard-throw immediately so triage gets detail without waiting for a
|
||||
// polling timeout. See test_staging_full_saas.sh step 7/11 and issue #2632.
|
||||
//
|
||||
// That distinction became load-bearing on 2026-06-03: workspace-server
|
||||
// #2162 (fix(provision): platform-managed workspace must fail-closed when
|
||||
// CP proxy env absent) made a platform_managed workspace ABORT AT BOOT
|
||||
@@ -287,8 +311,10 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
// the node + tabs render, proceed. We do NOT mask a real boot regression:
|
||||
// any `failed` carrying a last_sample_error, OR a non-zero uptime (the
|
||||
// agent started then crashed — image pull, panic, PYTHONPATH, etc.),
|
||||
// still hard-throws. Genuine *infra* provision failure is already caught
|
||||
// loud one step earlier at the org level (instance_status === "failed").
|
||||
// still hard-throws immediately so triage gets boot_stage / last_error /
|
||||
// image fields without waiting for a polling timeout.
|
||||
// Genuine *infra* provision failure is already caught loud one step
|
||||
// earlier at the org level (instance_status === "failed").
|
||||
await waitFor<boolean>(
|
||||
async () => {
|
||||
const r = await jsonFetch(`${tenantURL}/workspaces/${workspaceId}`, {
|
||||
@@ -315,13 +341,7 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
);
|
||||
return true;
|
||||
}
|
||||
// last_sample_error is often empty when the failure happens before
|
||||
// the agent emits a sample (e.g. boot crash, image pull error,
|
||||
// missing PYTHONPATH, OpenAI quota at startup). Dumping the full
|
||||
// body gives triage the boot_stage / last_error / image fields it
|
||||
// needs without a second probe. Otherwise this propagates as a
|
||||
// bare "Workspace failed: " — the exact useless message that
|
||||
// sent #2632 to the issue tracker.
|
||||
// Real boot regression — hard-throw immediately with full detail.
|
||||
const detail = sampleErr
|
||||
? sampleErr
|
||||
: `(no last_sample_error) full body: ${JSON.stringify(r.body)}`;
|
||||
@@ -333,7 +353,7 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
10_000,
|
||||
"workspace online",
|
||||
);
|
||||
console.log(`[staging-setup] Workspace renderable`);
|
||||
console.log(`[staging-setup] Workspace online`);
|
||||
|
||||
// 7. Hand state off to tests + teardown — overwrite the slug-only
|
||||
// bootstrap state with the full state spec tests need.
|
||||
|
||||
@@ -172,7 +172,7 @@ export function ContextMenu() {
|
||||
const nodeId = contextMenu.nodeId;
|
||||
closeContextMenu();
|
||||
try {
|
||||
await api.post(`/workspaces/${nodeId}/pause`, {});
|
||||
await api.post(`/workspaces/${nodeId}/pause?cascade=true`, {});
|
||||
updateNodeData(nodeId, { status: "paused" });
|
||||
} catch (e) {
|
||||
showToast("Pause failed", "error");
|
||||
@@ -184,7 +184,7 @@ export function ContextMenu() {
|
||||
const nodeId = contextMenu.nodeId;
|
||||
closeContextMenu();
|
||||
try {
|
||||
await api.post(`/workspaces/${nodeId}/resume`, {});
|
||||
await api.post(`/workspaces/${nodeId}/resume?cascade=true`, {});
|
||||
updateNodeData(nodeId, { status: "provisioning" });
|
||||
} catch (e) {
|
||||
showToast("Resume failed", "error");
|
||||
|
||||
@@ -385,7 +385,7 @@ describe("ContextMenu — item actions", () => {
|
||||
render(<ContextMenu />);
|
||||
fireEvent.click(screen.getByRole("menuitem", { name: /pause/i }));
|
||||
await act(async () => { /* flush */ });
|
||||
expect(mockPost).toHaveBeenCalledWith("/workspaces/n1/pause", {});
|
||||
expect(mockPost).toHaveBeenCalledWith("/workspaces/n1/pause?cascade=true", {});
|
||||
expect(mockStoreState.updateNodeData).toHaveBeenCalledWith("n1", { status: "paused" });
|
||||
});
|
||||
|
||||
@@ -395,7 +395,7 @@ describe("ContextMenu — item actions", () => {
|
||||
render(<ContextMenu />);
|
||||
fireEvent.click(screen.getByRole("menuitem", { name: /resume/i }));
|
||||
await act(async () => { /* flush */ });
|
||||
expect(mockPost).toHaveBeenCalledWith("/workspaces/n1/resume", {});
|
||||
expect(mockPost).toHaveBeenCalledWith("/workspaces/n1/resume?cascade=true", {});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -324,7 +324,7 @@ export const useCanvasStore = create<CanvasState>((set, get) => ({
|
||||
batchPause: async () => {
|
||||
const ids = Array.from(get().selectedNodeIds);
|
||||
const results = await Promise.allSettled(
|
||||
ids.map((id) => api.post(`/workspaces/${id}/pause`))
|
||||
ids.map((id) => api.post(`/workspaces/${id}/pause?cascade=true`))
|
||||
);
|
||||
const failed: string[] = [];
|
||||
results.forEach((r, i) => {
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
**Status:** living document — update when you ship a feature that touches one backend.
|
||||
**Owner:** workspace-server + controlplane teams.
|
||||
**Last audit:** 2026-05-07 (plugin install/uninstall closed for EC2 backend via EIC SSH push to the bind-mounted `/configs/plugins/<name>/`, mirroring the Files API PR #1702 pattern).
|
||||
**Last audit:** 2026-05-31 (Claude agent — drift risk #6 verified resolved; nil guards present, contract tests run without Skip).
|
||||
|
||||
## Why this exists
|
||||
|
||||
@@ -93,12 +93,12 @@ For "do we have any backend?", use `HasProvisioner()`, never bare `h.provisioner
|
||||
3. **Restart divergence on runtime changes.** Docker re-reads `/configs/config.yaml` from the container before stop, so a changed `runtime:` survives a restart even if the DB isn't synced. EC2 trusts the DB only. If you change the runtime via the Config tab and the handler races the restart, Docker will land on the new runtime, EC2 will land on the old one. **Fix path:** make the Config-tab save explicitly flush to DB before kicking off a restart, not deferred.
|
||||
4. **Console-output asymmetry.** Users debugging a stuck workspace on Docker see `docker logs`; on EC2 they see `GetConsoleOutput`. The two outputs look nothing alike. **Fix path:** expose a unified `GET /workspaces/:id/boot-log` that proxies to whichever backend serves the data. Already partly there via `cp_provisioner.Console`.
|
||||
5. **Template script drift.** `install.sh` and `start.sh` in each template repo do the same high-level work (install hermes-agent, write .env, write config.yaml, start gateway) but must be kept byte-level consistent on the provider-key forwarding block. Easy to forget. Enforced now by `tools/check-template-parity.sh` (see below) — run it in each template repo's CI.
|
||||
6. **Both backends panic when underlying client is nil.** ✅ **Resolved** (`fix/provisioner-nil-guards-1813`). `Provisioner.{Stop,IsRunning}` and `CPProvisioner.{Stop,IsRunning}` now guard against nil clients with `ErrNoBackend`, so the contract-test runner executes scenarios against zero-valued backends without panic.
|
||||
6. ~~**Both backends panic when underlying client is nil.**~~ **RESOLVED** — nil guards landed in `Provisioner` (`Start`, `Stop`, `IsRunning`, `ExecRead`, `RemoveVolume`, `VolumeHasFile`, `WriteAuthTokenToVolume`) and `CPProvisioner` (`Stop`, `IsRunning`), all returning `ErrNoBackend`. Contract tests (`TestDockerBackend_Contract`, `TestCPProvisionerBackend_Contract`, `TestZeroValuedBackends_NoPanic`) run in CI without `t.Skip`.
|
||||
|
||||
## Enforcement
|
||||
|
||||
- **`tools/check-template-parity.sh`** (this repo) — ensures `install.sh` and `start.sh` in a template repo forward identical sets of provider keys. Wire into each template repo's CI as `bash $MONOREPO/tools/check-template-parity.sh install.sh start.sh`.
|
||||
- **Contract tests** — `workspace-server/internal/provisioner/backend_contract_test.go` defines the behaviors every `provisioner.Provisioner` implementation must satisfy. Fails compile when a method drifts between `Docker` and `CPProvisioner`. Scenario-level runs execute against zero-valued backends since drift risk #6 was resolved (`fix/provisioner-nil-guards-1813`).
|
||||
- **Contract tests** — `workspace-server/internal/provisioner/backend_contract_test.go` defines the behaviors every `provisioner.Provisioner` implementation must satisfy. Fails compile when a method drifts between `Docker` and `CPProvisioner`. Scenario-level runs (`TestDockerBackend_Contract`, `TestCPProvisionerBackend_Contract`, `TestZeroValuedBackends_NoPanic`) execute in CI — drift risk #6 resolved.
|
||||
- **Source-level dispatcher pins** — `workspace_provision_auto_test.go` enforces the SoT pattern documented above:
|
||||
- `TestNoCallSiteCallsDirectProvisionerExceptAuto` — no handler calls `.provisionWorkspace(` or `.provisionWorkspaceCP(` directly outside the dispatcher's allowlist.
|
||||
- `TestNoCallSiteCallsBareStop` — no handler calls `.provisioner.Stop(` or `.cpProv.Stop(` directly outside the dispatcher's allowlist (strips Go comments before substring match so archaeology in code comments doesn't trip the gate).
|
||||
|
||||
@@ -0,0 +1,293 @@
|
||||
# RFC: Org-level Platform Agent — a tenant-resident concierge
|
||||
|
||||
**Perspective:** CTO + Backend Engineer + DevOps
|
||||
**Status:** Draft — pre-implementation, **CTO sign-off required before any implementation PR**
|
||||
**Scope:** `molecule-core` (workspace-server), `molecule-controlplane`, workspace runtime, `molecule-app`
|
||||
**This document is the single source of truth (SSOT) for the feature.** Code, OpenAPI, the platform
|
||||
MCP, and end-user docs reconcile to this RFC — not to each other.
|
||||
|
||||
---
|
||||
|
||||
## 1. Summary
|
||||
|
||||
Today a Molecule tenant is a control/router box: one EC2 runs the `workspace-server`
|
||||
(`molecule-tenant` container) + Postgres + Redis, and **each workspace is its own separate EC2**
|
||||
running a runtime image that joins the tenant's A2A mesh. A2A has exactly two participant kinds:
|
||||
**workspaces** (agents) and the **user** (the canvas, modeled implicitly as `activity_logs.source_id
|
||||
IS NULL`). A user who wants to *do* anything must drive individual workspaces directly — create them,
|
||||
assign agents, wire channels/schedules/secrets — i.e. they must carry a lot of platform knowledge.
|
||||
|
||||
This RFC introduces a **platform agent**: an always-on org-level agent that
|
||||
|
||||
1. runs as a **container on the tenant EC2** itself (beside `molecule-tenant`),
|
||||
2. natively holds the **platform-management MCP** (the org-admin tool surface) so it can do anything
|
||||
in the org,
|
||||
3. joins A2A as a **first-class third participant** (`kind='platform'`) that sits at the org root, and
|
||||
4. becomes the **user's default chat target** — a concierge the user talks to like a chatbot, which
|
||||
then orchestrates the org on their behalf.
|
||||
|
||||
Destructive actions the concierge triggers are **human-approved** through the existing approvals
|
||||
subsystem.
|
||||
|
||||
## 2. Motivation
|
||||
|
||||
- **Lower the knowledge floor.** "Spin up an SEO team and have them publish weekly" should be a
|
||||
sentence, not a sequence of workspace/agent/schedule/secret operations.
|
||||
- **One front door.** A single conversational entry point that *is* the org, instead of N per-workspace
|
||||
chats the user has to coordinate.
|
||||
- **Reuse, don't rebuild.** The agent runtime, A2A mesh, the 87-tool platform MCP, and the approvals
|
||||
subsystem already exist. This feature is mostly *composition* plus one honest new participant kind.
|
||||
|
||||
## 3. Goals / Non-Goals
|
||||
|
||||
**Goals**
|
||||
- A per-tenant platform agent, provisioned automatically, that controls the org via the platform MCP.
|
||||
- A first-class `platform` participant in A2A with correct routing and tenant isolation.
|
||||
- Server-side approval gating for destructive org operations.
|
||||
- Parity with normal workspaces for runtime/model/provider/billing (no special-casing).
|
||||
|
||||
**Non-Goals (this RFC)**
|
||||
- Replacing the canvas. The canvas remains the advanced/power-user surface.
|
||||
- Multi-concierge / per-team concierges. Exactly **one** platform agent per org.
|
||||
- A new scoped-down token system for the MCP (tracked separately; see §10 Open Questions).
|
||||
|
||||
## 4. Current-state ground truth (verified, with references)
|
||||
|
||||
- **Topology.** Tenant EC2 runs `molecule-tenant` (workspace-server) + Postgres + Redis;
|
||||
`controlplane/internal/provisioner/ec2.go:buildTenantUserDataSM()` `docker run`s it with
|
||||
`--network host`, `PORT=8080`. Each **workspace is its own EC2** (`ec2.go:ProvisionWorkspace`).
|
||||
- **No `org_id` column.** An "org" is the `parent_id IS NULL` subtree root;
|
||||
`workspace-server/internal/handlers/org_scope.go` resolves it with a recursive CTE (`orgRootID`) and
|
||||
`sameOrg()` compares two workspaces' resolved roots for tenant isolation (#1953/OFFSEC-015).
|
||||
- **A2A authorization is hierarchy-based.** `workspace-server/internal/registry/access.go:CanCommunicate`
|
||||
permits self / siblings / ancestor↔descendant. Root-level rows are "siblings" but every routing path
|
||||
is additionally gated by `sameOrg()`.
|
||||
- **No participant-kind discriminator.** `workspaces.role` is a free-form string; the user is implicit
|
||||
(`activity_logs.source_id IS NULL`). `migrations/001_workspaces.sql`.
|
||||
- **Runtime injects MCP servers** in the claude-code executor's `mcp_servers` dict — today exactly one
|
||||
entry, `"a2a"` (`molecule-ai-workspace-template-claude-code/claude_sdk_executor.py`,
|
||||
`molecule_runtime/claude_sdk_executor.py`). The agent self-registers via `POST /registry/register`
|
||||
(`molecule_runtime/main.py`) and is identified by `WORKSPACE_ID` + `X-Molecule-Org-Id`.
|
||||
- **Platform MCP** (`molecule-mcp-server`, stdio Node) authenticates purely from env
|
||||
(`MOLECULE_API_KEY` = org-admin token, `MOLECULE_API_URL`, `MOLECULE_ORG_ID`; `src/api.ts`), is a
|
||||
thin proxy over the tenant REST/A2A API (`chat_with_agent` → `POST /workspaces/:id/a2a`,
|
||||
`async_delegate` → `/delegate`), and has **zero embeddability blockers**.
|
||||
- **Billing** is a per-workspace resolver — `ResolveLLMBillingModeDerived`
|
||||
(`workspace-server/internal/handlers/workspace_provision.go`, `llm_billing_mode.go`), defaulting
|
||||
closed to `platform_managed`; `byok` runs on the tenant's own provider key (see
|
||||
`docs/architecture/byok-fail-closed-billing.md`).
|
||||
- **Approvals** exist: `migrations/007_approvals.sql`, `internal/handlers/approvals.go`,
|
||||
`EventApprovalRequested`, decide route `POST /workspaces/:id/approvals/:approvalId/decide`.
|
||||
|
||||
## 5. Design
|
||||
|
||||
### 5.1 The platform agent IS the org root
|
||||
|
||||
Because `sameOrg()` resolves each workspace to its topmost `parent_id IS NULL` root, a platform agent
|
||||
added as a *second* root would resolve to a *different* root than the existing team and be **blocked**
|
||||
by `sameOrg`. Therefore the platform agent **becomes the single org root**, and the org's existing
|
||||
root is **re-parented under it**. Consequences:
|
||||
|
||||
- `orgRootID(any workspace) == platform-agent-id`; `sameOrg(platform, any in-org ws) == true`.
|
||||
- The platform agent reaches every workspace (and is reachable) via the **existing**
|
||||
ancestor↔descendant rules — **no `CanCommunicate` change**, and tenant isolation is unchanged.
|
||||
|
||||
This is the honest realization of "a third participant above workspace and user": the concierge is
|
||||
literally the org.
|
||||
|
||||
### 5.2 `kind` discriminator (the only new marker)
|
||||
|
||||
Add a single column `workspaces.kind TEXT NOT NULL DEFAULT 'workspace'`, constrained to
|
||||
`('workspace','platform')`. It is the **only** marker of the platform agent — we do **not** also
|
||||
encode identity in `role`/`tier` (those stay descriptive). The enum is defined once: the migration
|
||||
`CHECK` and the Go constants `KindWorkspace`/`KindPlatform` (+ one `IsValidKind`) are kept in lockstep.
|
||||
|
||||
Invariants (handler-enforced, since there is no `org_id` for a pure-SQL unique):
|
||||
- `kind='platform' ⇒ parent_id IS NULL`.
|
||||
- A row may be `kind='platform'` only if it is its own org root (`orgRootID(self) == self`), giving
|
||||
"exactly one platform agent per org". Guard the check+write in a tx with `FOR UPDATE` on the root.
|
||||
|
||||
### 5.3 Identity & registration
|
||||
|
||||
- **ID** = derived `uuidv5(org-namespace, "platform-agent")` — reproducible, no stored-vs-derived
|
||||
drift, lowercase so it satisfies the runtime's `WORKSPACE_ID` validator.
|
||||
- CP **pre-seeds** the `workspaces` row (`kind='platform'`, `parent_id=NULL`, `tier=0`) before the
|
||||
agent boots; the agent self-registers (`POST /registry/register`) into that row. `Register` accepts
|
||||
an optional `kind` and reconciles it, enforcing the §5.2 invariants.
|
||||
|
||||
### 5.4 Default-target resolver
|
||||
|
||||
New `GET /registry/platform-agent` (handler `internal/handlers/platform_agent.go`): resolve the
|
||||
caller's `orgRootID()` and return it iff `kind='platform'`. This is the server hook the dashboard
|
||||
targets by default; no change to `ProxyA2A`. **Authored in the OpenAPI SSOT first**; MCP/CLI/docs
|
||||
derive from it.
|
||||
|
||||
### 5.5 Runtime: two MCPs, config-driven
|
||||
|
||||
Make the runtime's `mcp_servers` **config-driven** rather than hardcoded:
|
||||
- `molecule_runtime/config.py`: add `extra_mcp_servers: list[dict]` to `WorkspaceConfig`, read
|
||||
`raw.get("mcp_servers", [])`.
|
||||
- Both executors merge `extra_mcp_servers` into the `mcp_servers` dict after the always-on `"a2a"`
|
||||
entry (the template `claude_sdk_executor.py` is the live one; the runtime-package copy is the
|
||||
fallback).
|
||||
|
||||
The platform agent's `config.yaml` then declares:
|
||||
|
||||
```yaml
|
||||
runtime: claude-code
|
||||
model: sonnet # default; user-switchable model AND provider via providers.yaml
|
||||
a2a:
|
||||
port: 8090 # avoid the workspace default 8000 under host networking
|
||||
mcp_servers:
|
||||
- name: platform
|
||||
command: node
|
||||
args: ["/opt/molecule-mcp-server/dist/index.js"]
|
||||
```
|
||||
|
||||
The `platform` MCP reads `MOLECULE_API_KEY`/`MOLECULE_API_URL`/`MOLECULE_ORG_ID` from the container
|
||||
env (passed through to the stdio child) — no per-server `env` block needed.
|
||||
|
||||
### 5.6 Hosting & provisioning (tenant EC2 container)
|
||||
|
||||
In `ec2.go:buildTenantUserDataSM()` add a `start_platform_agent` stage **after** `wait_platform_health`
|
||||
(the agent registers against `localhost:8080` on boot):
|
||||
|
||||
```bash
|
||||
docker run -d --restart=always --name molecule-platform-agent --network host \
|
||||
-v /data/platform-agent/configs:/configs \
|
||||
-e WORKSPACE_ID=<platform-uuid> -e WORKSPACE_CONFIG_PATH=/configs \
|
||||
-e PLATFORM_URL=http://localhost:8080 \
|
||||
-e MOLECULE_API_URL=http://localhost:8080 -e MOLECULE_API_KEY=$ADMIN_TOKEN -e MOLECULE_ORG_ID=<orgID> \
|
||||
-e ANTHROPIC_AUTH_TOKEN=$ADMIN_TOKEN -e MOLECULE_LLM_ANTHROPIC_BASE_URL=$MOLECULE_LLM_ANTHROPIC_BASE_URL \
|
||||
<platform-agent-image>
|
||||
```
|
||||
|
||||
- The org `admin_token` is already on the box (Secrets Manager `molecule/tenant/{orgID}`).
|
||||
- `--restart=always` provides Docker-level supervision (matches `molecule-tenant`).
|
||||
- Mirror the block into the redeploy path (`buildRedeployScript`) so existing tenants backfill it.
|
||||
|
||||
### 5.7 Image
|
||||
|
||||
A **dedicated `molecule-platform-agent` image**: `FROM workspace-template-claude-code`, `COPY` the
|
||||
prebuilt `molecule-mcp-server/dist` + `node_modules` into `/opt/molecule-mcp-server`, and **pin Node
|
||||
20** (the slim base ships Node 18; the MCP expects ≥20). A dedicated image keeps the org-admin MCP
|
||||
**out of** ordinary workspace images (security hygiene) and lets us set concierge defaults without
|
||||
touching the workspace template. `molecule-ci` publishes it.
|
||||
|
||||
### 5.8 Approval gate (server-side trust boundary)
|
||||
|
||||
The MCP is a *client* of the tenant handlers, so enforcement lives in the **handlers**, not the MCP.
|
||||
|
||||
- `internal/approvals/policy.go` (new): one auditable map of gated actions —
|
||||
`delete_workspace`, `deprovision`, `secret_write`, `org_token_mint`.
|
||||
- `requireApproval(ctx, workspaceID, action, contextHash)` reuses the existing approvals
|
||||
INSERT/broadcast/escalate. If an `approved`+unconsumed row matches → consume it → proceed. Else
|
||||
create a `pending` row, broadcast `EventApprovalRequested`, and return **HTTP 202
|
||||
`{approval_id, status:"pending"}`** instead of executing. The human decides via the existing decide
|
||||
route; the agent retries and the gate now passes.
|
||||
- Add `approval_requests.consumed_at` (single-use) and optional `request_hash` (dedupe identical
|
||||
pending requests).
|
||||
- **Escalation:** the platform agent's `parent_id` is NULL, so platform-originated approvals escalate
|
||||
to the **user** (canvas notify), not a parent.
|
||||
- The 202 response shape is authored in the **OpenAPI SSOT**.
|
||||
|
||||
### 5.9 Billing & model/provider parity
|
||||
|
||||
The platform agent is a `workspaces` row, so it inherits the one billing resolver and the
|
||||
`providers.yaml` runtime matrix unchanged:
|
||||
- **Default `platform_managed`** (metered CP proxy, billed to org credits) — the env wiring in §5.6.
|
||||
- **`byok`** = flip `/admin/workspaces/:id/llm-billing-mode` + supply the org's `ANTHROPIC_API_KEY`
|
||||
secret (workspace or global). Exposed as a provisioning flag so a tenant can choose at create time.
|
||||
- Model **and provider** are switchable (Claude, Kimi-for-coding, …) via the same dashboard
|
||||
model-switcher any workspace uses.
|
||||
|
||||
### 5.10 UX (summary; detailed in app RFC / Phase 5)
|
||||
|
||||
The **dashboard** (`molecule-app`) becomes the primary entry: a concierge chat (default-targeting the
|
||||
§5.4 resolver) plus a live org overview, with pending approvals surfaced inline. The **canvas** stays
|
||||
for advanced users. First UI version is produced in Claude Design and iterated before build.
|
||||
|
||||
## 6. SSOT mapping (derive, don't fork)
|
||||
|
||||
| Concern | Single source of truth | This RFC's rule |
|
||||
|---|---|---|
|
||||
| "The org" | `orgRootID()`/`sameOrg()` (`org_scope.go`) | platform agent *becomes* the root; no `org_id` column |
|
||||
| Platform marker | `workspaces.kind` | `kind` only; never also `role`/`tier` |
|
||||
| Model/provider | `providers.yaml` runtime matrix | concierge switches via the same registry |
|
||||
| LLM billing | `ResolveLLMBillingModeDerived` | inherits the one resolver; no new path |
|
||||
| Config/secrets delivery | tenant Secrets Manager bundle (`seedWorkspaceConfigSecret`) | no new S3 prefix / second store |
|
||||
| Management API | OpenAPI spec | new endpoints authored there first; MCP/CLI/docs derive |
|
||||
| Gated actions | `internal/approvals/policy.go` | one map |
|
||||
| Platform-agent id | `uuidv5(org, "platform-agent")` | derived, never stored separately |
|
||||
|
||||
## 7. Security & blast radius
|
||||
|
||||
The concierge holds the org **admin token** (full tenant-root, self-minting) and is driven by
|
||||
end-user chat. Mitigations:
|
||||
- **Approval gate (§5.8)** must ship *with* the agent going user-facing, not after. Until then the
|
||||
agent is operator-only.
|
||||
- **Tenant isolation** is unchanged — every reach path still passes `sameOrg()`.
|
||||
- **MCP not in workspace images** (dedicated image, §5.7); the admin token lives only in the
|
||||
platform-agent container env on the tenant box.
|
||||
- **Token rotation:** the MCP reads env once at spawn → rotation = `docker restart
|
||||
molecule-platform-agent` (runbook item).
|
||||
- Future: a scoped-down org token (no delete/billing/member) — see §10.
|
||||
|
||||
## 8. Migration & rollout
|
||||
|
||||
Phase ordering is the rollout contract:
|
||||
- **Phase 0** (schema) ships and bakes before anything writes `kind`. Backward-compatible: every
|
||||
existing row defaults to `kind='workspace'`; the `CHECK` is added `NOT VALID` then validated.
|
||||
- **Phase 1 re-parenting backfill** is the one real watch-item. **Before** running it, audit whether
|
||||
any org-scoped table keys off the *root workspace id* (e.g. `org_api_tokens`, `org_plugin_allowlist`)
|
||||
versus the CP org UUID. If they reference the root workspace id, re-parenting changes "the root" and
|
||||
those refs must migrate too. The backfill is per-org, idempotent, and reversible.
|
||||
- New orgs get the platform agent from first boot; existing orgs backfill via `/admin/tenants
|
||||
redeploy` + a one-time re-parent migration.
|
||||
|
||||
## 9. Implementation phases
|
||||
|
||||
0. **Schema + model** (`molecule-core`): `kind` column + `approval_requests.consumed_at`; model field +
|
||||
constants; `Register` accepts/validates `kind` with invariants.
|
||||
1. **Platform-as-root + resolver** (`molecule-core` + CP): CP pre-seeds the platform row and creates
|
||||
teams under it; per-org re-parent backfill (after the §8 audit); `GET /registry/platform-agent`.
|
||||
2. **Config-driven two-MCP runtime** (runtime + claude-code template).
|
||||
3. **Image + tenant provisioning** (CP + image + `molecule-ci`): dedicated image; `start_platform_agent`
|
||||
in user-data + redeploy; config via the tenant Secrets Manager bundle; billing knob.
|
||||
4. **Approval gate** (`molecule-core`): policy map + `requireApproval` at destructive handlers; OpenAPI
|
||||
202 shape.
|
||||
5. **Dashboard concierge UX** (`molecule-app`): design-first, then build against the resolver.
|
||||
6. **Cleanup**: exclude the platform agent from billable counts; canvas visibility; rotation runbook.
|
||||
|
||||
## 10. Open questions
|
||||
|
||||
- **Scoped-down token.** Should the concierge hold a reduced-scope token (no delete/billing/member)
|
||||
instead of full admin + an approval gate? The token-scope system does not exist yet (`orgtoken`
|
||||
TODO). Recommendation: ship admin-token + approval gate now; add scope-down as a follow-up.
|
||||
- **Re-parenting vs. wrapper.** If product later wants a platform agent that is *not* the topological
|
||||
root, a `CanCommunicateWithKind` wrapper (guarded by `sameOrg`) is the alternative. Deferred —
|
||||
platform-as-root is lower-risk and needs zero access-control change.
|
||||
- **Canvas visibility** of the root concierge node (hide vs. show as the org anchor).
|
||||
|
||||
## 11. Verification (end-to-end on a staging tenant)
|
||||
|
||||
1. **Schema:** Phase-0 migrations applied; existing workspaces report `kind='workspace'`; `go test
|
||||
./...` + `-tags=integration` green.
|
||||
2. **Provision:** redeploy a staging tenant; `docker ps` shows `molecule-platform-agent` healthy; its
|
||||
logs show a successful `/registry/register`.
|
||||
3. **Identity:** the platform row is `kind='platform'`, `parent_id IS NULL`; the former root now has
|
||||
`parent_id = <platform id>`; `GET /registry/platform-agent` returns it.
|
||||
4. **Reach:** chat the platform agent → it `list_workspaces` then `create_workspace` via the platform
|
||||
MCP and reports back via `send_message_to_user`.
|
||||
5. **Isolation:** it reaches every workspace in its org and **cannot** reach another tenant's
|
||||
workspace.
|
||||
6. **Approval gate:** `delete_workspace` → HTTP 202 pending + approval event; decide-approve →
|
||||
completes; a second delete with the same approval is rejected (consumed).
|
||||
7. Drive a real concierge flow ("spin up a PM + engineer to build X") and watch the delegation/activity
|
||||
ledger.
|
||||
|
||||
---
|
||||
|
||||
*Derived from a read-only multi-agent source audit of `molecule-core`, `molecule-controlplane`,
|
||||
`molecule-ai-workspace-runtime`, `molecule-ai-workspace-template-claude-code`, and
|
||||
`molecule-mcp-server`. No secret values recorded.*
|
||||
@@ -8,26 +8,39 @@ against the latest `main`.
|
||||
|
||||
## Queue Contract
|
||||
|
||||
Add the `merge-queue` label to an open PR when it is ready to merge.
|
||||
**Auto-discovery (opt-OUT, default).** You do NOT need to label a PR. The bot
|
||||
auto-discovers every open same-repo PR and merges any that meets the bar. The
|
||||
`merge-queue` label is now optional metadata, not a gate. This removed the
|
||||
historical autonomy gap: agent Gitea tokens lack `write:issue` (labels are
|
||||
issue-scoped), so agents could never self-label and ready PRs stalled.
|
||||
|
||||
To keep a PR OUT of autonomous merging, add an opt-OUT label:
|
||||
`merge-queue-hold`, `do-not-auto-merge`, or `wip`. Draft PRs are also skipped.
|
||||
|
||||
The bot processes one PR per tick:
|
||||
|
||||
1. Confirms `main` is green.
|
||||
2. Selects the oldest open PR carrying `merge-queue`.
|
||||
3. Skips PRs with `merge-queue-hold`.
|
||||
4. Rejects fork PRs because the queue may only update same-repo branches.
|
||||
5. If the PR head does not contain current `main`, calls Gitea's
|
||||
1. Confirms `main`'s branch-protection-required push contexts are green.
|
||||
2. Selects the oldest open same-repo PR that is NOT opt-out-labeled and NOT a
|
||||
draft (auto-discovery). With `AUTO_DISCOVER=0` it falls back to legacy
|
||||
opt-IN: only PRs carrying `merge-queue` are considered.
|
||||
3. Rejects fork PRs because the queue may only update same-repo branches.
|
||||
4. If the PR head does not contain current `main`, calls Gitea's
|
||||
`/pulls/{n}/update?style=merge` endpoint and waits for CI on the new head.
|
||||
6. Merges only after the current PR head has required contexts green:
|
||||
- `CI / all-required (pull_request)`
|
||||
- `sop-checklist / all-items-acked (pull_request)`
|
||||
5. Merges only when, on the PR's CURRENT head sha:
|
||||
- `>= required_approvals` distinct genuine official `APPROVED` reviews from
|
||||
the recognised reviewer set (read from branch protection; default 2),
|
||||
- no open official `REQUEST_CHANGES`,
|
||||
- every branch-protection-required status context is green, and
|
||||
- the PR is `mergeable` (Gitea returns `True`; `None`/`False` = wait).
|
||||
|
||||
The workflow is serialized with `concurrency`, so two queued PRs cannot be
|
||||
The merge bar is unchanged by auto-discovery — only WHICH PRs are considered
|
||||
changes. The workflow is serialized with `concurrency`, so two PRs cannot be
|
||||
merged against the same observed `main`.
|
||||
|
||||
## Operator Commands
|
||||
|
||||
Queue a PR:
|
||||
Queue a PR (optional — auto-discovery already considers every ready PR; the
|
||||
label is just visible metadata):
|
||||
|
||||
```bash
|
||||
curl -fsS -X POST \
|
||||
@@ -37,7 +50,8 @@ curl -fsS -X POST \
|
||||
-d '{"labels":["merge-queue"]}'
|
||||
```
|
||||
|
||||
Temporarily hold a queued PR:
|
||||
Keep a PR OUT of autonomous merging (opt-OUT — use `merge-queue-hold`,
|
||||
`do-not-auto-merge`, or `wip`):
|
||||
|
||||
```bash
|
||||
curl -fsS -X POST \
|
||||
@@ -56,9 +70,11 @@ REPO=molecule-ai/molecule-core \
|
||||
WATCH_BRANCH=main \
|
||||
QUEUE_LABEL=merge-queue \
|
||||
HOLD_LABEL=merge-queue-hold \
|
||||
AUTO_DISCOVER=1 \
|
||||
OPT_OUT_LABELS=do-not-auto-merge,wip \
|
||||
REVIEWER_SET=agent-reviewer,agent-researcher,agent-reviewer-cr2 \
|
||||
UPDATE_STYLE=merge \
|
||||
REQUIRED_CONTEXTS='CI / all-required (pull_request),sop-checklist / all-items-acked (pull_request)' \
|
||||
python3 .gitea/scripts/gitea-merge-queue.py
|
||||
python3 .gitea/scripts/gitea-merge-queue.py --dry-run
|
||||
```
|
||||
|
||||
Dry run:
|
||||
|
||||
@@ -1050,12 +1050,13 @@ def test_reap_continues_on_per_sha_apierror(sr_module, monkeypatch, capsys):
|
||||
|
||||
|
||||
def test_main_soft_skips_when_commit_listing_times_out(sr_module, monkeypatch, capsys):
|
||||
"""A transient outage while listing recent commits should not paint main red.
|
||||
"""A transient outage while listing recent commits fails the tick visibly.
|
||||
|
||||
Per-SHA status read failures are already isolated inside `reap_branch`.
|
||||
The real 2026-05-14 failure was earlier: `/commits?sha=main&limit=30`
|
||||
timed out after all retries, aborting the tick. The next 5-minute tick can
|
||||
retry safely, so `main()` should emit an observable warning and return 0.
|
||||
retry safely, but the tick itself must be observable as red (exit 1 + error
|
||||
annotation) so the cron bot alerts on persistent infra issues.
|
||||
"""
|
||||
|
||||
monkeypatch.setattr(sr_module, "scan_workflows", lambda _: {"workflow-without-push": False})
|
||||
@@ -1068,9 +1069,9 @@ def test_main_soft_skips_when_commit_listing_times_out(sr_module, monkeypatch, c
|
||||
monkeypatch.setattr(sr_module, "list_recent_commit_shas", fake_list_recent_commit_shas)
|
||||
monkeypatch.setattr(sys, "argv", ["status-reaper.py"])
|
||||
|
||||
assert sr_module.main() == 0
|
||||
assert sr_module.main() == 1
|
||||
captured = capsys.readouterr()
|
||||
assert "::warning::status-reaper skipped this tick" in captured.out
|
||||
assert "::error::status-reaper cannot run" in captured.out
|
||||
assert '"skipped": true' in captured.out
|
||||
assert '"skip_reason": "commit-list-api-error"' in captured.out
|
||||
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
// Package approvals holds the single source of truth for which destructive
|
||||
// org operations require a human approval before they execute.
|
||||
//
|
||||
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
//
|
||||
// The org-level platform agent is driven by end-user chat and holds an org-admin
|
||||
// token, so destructive/irreversible operations it can trigger are gated: the
|
||||
// handler creates a pending approval and returns it instead of executing, and a
|
||||
// human decides via the existing approvals subsystem. Keeping the gated-action
|
||||
// list in ONE map makes the blast-radius boundary auditable in a single place —
|
||||
// a handler not listed here behaves exactly as before.
|
||||
package approvals
|
||||
|
||||
// Action is the canonical identifier of a gated destructive operation. The same
|
||||
// string is stored in approval_requests.action so the gate can match a pending/
|
||||
// approved request to the operation being retried.
|
||||
type Action string
|
||||
|
||||
const (
|
||||
ActionDeleteWorkspace Action = "delete_workspace"
|
||||
ActionDeprovision Action = "deprovision_workspace"
|
||||
ActionSecretWrite Action = "secret_write"
|
||||
ActionOrgTokenMint Action = "org_token_mint"
|
||||
)
|
||||
|
||||
// gated is the set of actions that require a human approval. Add an entry here
|
||||
// (and gate the corresponding handler with requireApproval) to expand the
|
||||
// boundary; remove one to drop a gate. This is the only place the policy lives.
|
||||
var gated = map[Action]bool{
|
||||
ActionDeleteWorkspace: true,
|
||||
ActionDeprovision: true,
|
||||
ActionSecretWrite: true,
|
||||
ActionOrgTokenMint: true,
|
||||
}
|
||||
|
||||
// IsGated reports whether action requires a human approval before executing.
|
||||
func IsGated(action Action) bool {
|
||||
return gated[action]
|
||||
}
|
||||
@@ -271,6 +271,11 @@ func (m *Manager) Reload(ctx context.Context) {
|
||||
ch.Config["_channel_id"] = ch.ID
|
||||
|
||||
go func(a ChannelAdapter, c ChannelRow, pCtx context.Context) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in channel polling goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
if err := a.StartPolling(pCtx, c.Config, m.onInboundMessage); err != nil {
|
||||
log.Printf("Channels: polling error for %s/%s: %v", c.ChannelType, truncID(c.ID), err)
|
||||
}
|
||||
@@ -354,6 +359,11 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
typingCtx, typingCancel := context.WithCancel(fireCtx)
|
||||
defer typingCancel()
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in typing indicator goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
typer.SendTyping(ch.Config, msg.ChatID)
|
||||
ticker := time.NewTicker(4 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -142,7 +142,7 @@ func ghcrAuthHeader() string {
|
||||
log.Printf("workspace-images: failed to marshal GHCR auth: %v", err)
|
||||
return ""
|
||||
}
|
||||
return base64.URLEncoding.EncodeToString(js)
|
||||
return base64.StdEncoding.EncodeToString(js)
|
||||
}
|
||||
|
||||
// Refresh pulls the requested runtimes' template images from GHCR and (if
|
||||
|
||||
@@ -47,9 +47,9 @@ func TestGHCRAuthHeader_EncodesDockerEnginePayload(t *testing.T) {
|
||||
if got == "" {
|
||||
t.Fatal("expected non-empty auth header")
|
||||
}
|
||||
raw, err := base64.URLEncoding.DecodeString(got)
|
||||
raw, err := base64.StdEncoding.DecodeString(got)
|
||||
if err != nil {
|
||||
t.Fatalf("auth header is not valid base64-url: %v", err)
|
||||
t.Fatalf("auth header is not valid base64: %v", err)
|
||||
}
|
||||
var payload map[string]string
|
||||
if err := json.Unmarshal(raw, &payload); err != nil {
|
||||
@@ -80,9 +80,9 @@ func TestGHCRAuthHeader_RespectsRegistryEnv(t *testing.T) {
|
||||
if got == "" {
|
||||
t.Fatal("expected non-empty auth header")
|
||||
}
|
||||
raw, err := base64.URLEncoding.DecodeString(got)
|
||||
raw, err := base64.StdEncoding.DecodeString(got)
|
||||
if err != nil {
|
||||
t.Fatalf("auth header is not valid base64-url: %v", err)
|
||||
t.Fatalf("auth header is not valid base64: %v", err)
|
||||
}
|
||||
var payload map[string]string
|
||||
if err := json.Unmarshal(raw, &payload); err != nil {
|
||||
@@ -220,7 +220,7 @@ func TestGHCRAuthHeader_TrimsWhitespace(t *testing.T) {
|
||||
t.Setenv("GHCR_USER", " alice ")
|
||||
t.Setenv("GHCR_TOKEN", "\tfake-tok-value\n")
|
||||
got := ghcrAuthHeader()
|
||||
raw, _ := base64.URLEncoding.DecodeString(got)
|
||||
raw, _ := base64.StdEncoding.DecodeString(got)
|
||||
var payload map[string]string
|
||||
_ = json.Unmarshal(raw, &payload)
|
||||
if payload["username"] != "alice" {
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
package handlers
|
||||
|
||||
// approval_gate.go — server-side gate for destructive org operations.
|
||||
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
//
|
||||
// requireApproval is the choke point a destructive handler calls before
|
||||
// executing. It is the trust boundary: the platform-management MCP is a CLIENT
|
||||
// of these handlers, so enforcing here (not in the MCP) means anything holding
|
||||
// an org-admin token still goes through the gate. The flow:
|
||||
//
|
||||
// - if a matching APPROVED + unconsumed approval exists, consume it (single-
|
||||
// use) and let the operation proceed;
|
||||
// - otherwise create (or reuse) a PENDING approval, broadcast it to the canvas
|
||||
// (and escalate to the parent if any), and the handler returns HTTP 202 so a
|
||||
// human can decide. The agent retries after approval and the gate passes.
|
||||
//
|
||||
// Matching is by (workspace_id, action, request_hash) where request_hash is a
|
||||
// stable digest of the operation + its context, so a retried op reuses its own
|
||||
// request instead of flooding the table, and an approval for "delete ws A"
|
||||
// cannot be replayed to "delete ws B".
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// approvalRequestHash is a stable digest of the gated operation. Go's
|
||||
// json.Marshal sorts map keys, so the same context always hashes the same.
|
||||
func approvalRequestHash(workspaceID, action string, contextMap map[string]interface{}) string {
|
||||
cj, err := json.Marshal(contextMap)
|
||||
if err != nil || cj == nil {
|
||||
cj = []byte("{}")
|
||||
}
|
||||
sum := sha256.Sum256([]byte(workspaceID + "\x00" + action + "\x00" + string(cj)))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
// requireApproval returns (approved=true, consumedID) when a matching approval
|
||||
// exists and was just consumed; otherwise it creates/reuses a pending approval
|
||||
// and returns (false, pendingID). A non-nil error is a server error.
|
||||
func requireApproval(ctx context.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) (bool, string, error) {
|
||||
hash := approvalRequestHash(workspaceID, string(action), contextMap)
|
||||
|
||||
// 1. Atomically consume an approved + unconsumed request, if one exists.
|
||||
// The conditional UPDATE ... RETURNING makes consumption race-safe: two
|
||||
// concurrent destructive calls cannot both consume the same approval.
|
||||
var consumedID string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
UPDATE approval_requests SET consumed_at = now()
|
||||
WHERE id = (
|
||||
SELECT id FROM approval_requests
|
||||
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3
|
||||
AND status = 'approved' AND consumed_at IS NULL
|
||||
ORDER BY decided_at DESC NULLS LAST
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`, workspaceID, string(action), hash).Scan(&consumedID)
|
||||
if err == nil {
|
||||
return true, consumedID, nil
|
||||
}
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
return false, "", fmt.Errorf("consume approval: %w", err)
|
||||
}
|
||||
|
||||
// 2. No usable approval — create a pending one, or reuse an existing pending
|
||||
// request for the same operation so retries don't flood the table.
|
||||
cj, mErr := json.Marshal(contextMap)
|
||||
if mErr != nil || cj == nil {
|
||||
cj = []byte("{}")
|
||||
}
|
||||
var approvalID string
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
WITH existing AS (
|
||||
SELECT id FROM approval_requests
|
||||
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3 AND status = 'pending'
|
||||
LIMIT 1
|
||||
), ins AS (
|
||||
INSERT INTO approval_requests (workspace_id, action, reason, context, request_hash)
|
||||
SELECT $1, $2, $4, $5::jsonb, $3
|
||||
WHERE NOT EXISTS (SELECT 1 FROM existing)
|
||||
RETURNING id
|
||||
)
|
||||
SELECT id FROM ins UNION ALL SELECT id FROM existing LIMIT 1
|
||||
`, workspaceID, string(action), hash, reason, string(cj)).Scan(&approvalID)
|
||||
if err != nil {
|
||||
return false, "", fmt.Errorf("create approval: %w", err)
|
||||
}
|
||||
|
||||
// Broadcast to the canvas (the user-facing signal). For a platform agent the
|
||||
// parent_id is NULL, so the requested-event on its own workspace IS the user
|
||||
// prompt; ordinary workspaces also escalate to their parent.
|
||||
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
}); bErr != nil {
|
||||
log.Printf("approval_gate: broadcast requested failed (ws=%s): %v", workspaceID, bErr)
|
||||
}
|
||||
var parentID *string
|
||||
if pErr := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); pErr != nil {
|
||||
log.Printf("approval_gate: parent lookup failed (ws=%s): %v", workspaceID, pErr)
|
||||
}
|
||||
if parentID != nil {
|
||||
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
}); bErr != nil {
|
||||
log.Printf("approval_gate: broadcast escalated failed (ws=%s): %v", workspaceID, bErr)
|
||||
}
|
||||
}
|
||||
return false, approvalID, nil
|
||||
}
|
||||
|
||||
// gateDestructive runs requireApproval for a gated action and, when approval is
|
||||
// still pending, writes the 202 response and returns false (caller must stop).
|
||||
// Returns true when the caller may proceed (action consumed an approval).
|
||||
func gateDestructive(c *gin.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) bool {
|
||||
if !approvals.IsGated(action) {
|
||||
return true
|
||||
}
|
||||
approved, approvalID, err := requireApproval(c.Request.Context(), b, workspaceID, action, reason, contextMap)
|
||||
if err != nil {
|
||||
log.Printf("gateDestructive: %v (ws=%s action=%s)", err, workspaceID, action)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "approval gate failed"})
|
||||
return false
|
||||
}
|
||||
if !approved {
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"status": "pending_approval",
|
||||
"approval_id": approvalID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
})
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// approval_gate_integration_test.go — REAL Postgres gate for requireApproval.
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_RequireApproval -v
|
||||
//
|
||||
// Why this is NOT a sqlmock test
|
||||
// ------------------------------
|
||||
// The whole gate is about row state across calls: a pending request is created
|
||||
// once and reused (dedup), an approval is consumed exactly once (single-use via
|
||||
// the conditional UPDATE ... RETURNING), and a different operation context hashes
|
||||
// to a different request. sqlmock returns whatever the stub says; only a real
|
||||
// Postgres proves the consume-once semantics and the partial-index lookup.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func TestIntegration_RequireApproval_GateCycle(t *testing.T) {
|
||||
url := requireIntegrationDBURL(t)
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
|
||||
// requireApproval + the broadcaster's structure_events write use the db.DB
|
||||
// global; point it at the integration DB and restore afterwards.
|
||||
prev := db.DB
|
||||
db.DB = conn
|
||||
t.Cleanup(func() { db.DB = prev })
|
||||
setupTestRedis(t) // broadcaster publishes to db.RDB; miniredis backs it
|
||||
|
||||
ctx := context.Background()
|
||||
b := newTestBroadcaster()
|
||||
|
||||
wsID := uuid.New().String()
|
||||
t.Cleanup(func() {
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM approval_requests WHERE workspace_id = $1`, wsID)
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE id = $1`, wsID)
|
||||
})
|
||||
// A root workspace (parent_id NULL) — like the platform agent, it has no
|
||||
// parent, so the gate's escalation target is the user/canvas. (This branch
|
||||
// is off main and has no kind column; the gate is kind-agnostic.)
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, status, runtime, parent_id)
|
||||
VALUES ($1, 'Org Concierge', 0, 'online', 'claude-code', NULL)`, wsID); err != nil {
|
||||
t.Fatalf("seed root workspace: %v", err)
|
||||
}
|
||||
|
||||
action := approvals.ActionDeleteWorkspace
|
||||
ctxA := map[string]interface{}{"target": "ws-A"}
|
||||
|
||||
// 1. First call → no approval yet → pending created.
|
||||
ok, id1, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 1: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 1: approved=true, want false (no approval exists yet)")
|
||||
}
|
||||
|
||||
// 2. Same operation again → must REUSE the same pending row (dedup), not flood.
|
||||
ok, id2, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 2: %v", err)
|
||||
}
|
||||
if ok || id2 != id1 {
|
||||
t.Fatalf("call 2: ok=%v id2=%s, want false and id2==id1(%s) (dedup)", ok, id2, id1)
|
||||
}
|
||||
var nPending int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT count(*) FROM approval_requests WHERE workspace_id=$1 AND status='pending'`, wsID).Scan(&nPending); err != nil {
|
||||
t.Fatalf("count pending: %v", err)
|
||||
}
|
||||
if nPending != 1 {
|
||||
t.Fatalf("pending rows = %d, want 1 (dedup must not flood)", nPending)
|
||||
}
|
||||
|
||||
// 3. A human approves it (simulating the Decide handler).
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE approval_requests SET status='approved', decided_by='human', decided_at=now() WHERE id=$1`, id1); err != nil {
|
||||
t.Fatalf("approve: %v", err)
|
||||
}
|
||||
|
||||
// 4. Now the gate consumes the approval and lets the op proceed.
|
||||
ok, consumedID, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 4: %v", err)
|
||||
}
|
||||
if !ok || consumedID != id1 {
|
||||
t.Fatalf("call 4: ok=%v consumedID=%s, want true and id1(%s)", ok, consumedID, id1)
|
||||
}
|
||||
|
||||
// 5. Single-use: the SAME approval cannot be replayed — the next call is
|
||||
// pending again (a fresh request), not approved.
|
||||
ok, id5, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 5: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 5: approved=true — a consumed approval was replayed")
|
||||
}
|
||||
if id5 == id1 {
|
||||
t.Fatal("call 5: reused the consumed request id; want a new pending request")
|
||||
}
|
||||
|
||||
// 6. Context isolation: an approval for ws-A must not authorize ws-B.
|
||||
// Approve the ws-A request, then a ws-B op must still be pending.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE approval_requests SET status='approved', decided_at=now() WHERE id=$1`, id5); err != nil {
|
||||
t.Fatalf("approve id5: %v", err)
|
||||
}
|
||||
ok, _, err = requireApproval(ctx, b, wsID, action, "delete ws-B", map[string]interface{}{"target": "ws-B"})
|
||||
if err != nil {
|
||||
t.Fatalf("call 6: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 6: ws-B proceeded on a ws-A approval — context isolation broken")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestGateDestructive_NonGatedPassesThrough verifies a non-gated action skips
|
||||
// the gate entirely (no DB access, no 202) so handlers whose action isn't in the
|
||||
// policy map behave exactly as before.
|
||||
func TestGateDestructive_NonGatedPassesThrough(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/x", nil)
|
||||
|
||||
proceed := gateDestructive(c, newTestBroadcaster(), "ws-1",
|
||||
approvals.Action("not_a_gated_action"), "noop", nil)
|
||||
|
||||
if !proceed {
|
||||
t.Fatalf("non-gated action must proceed, got proceed=false (status %d)", w.Code)
|
||||
}
|
||||
if w.Code != http.StatusOK { // CreateTestContext default; nothing written
|
||||
t.Errorf("non-gated action wrote a response (status %d), want none", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestApprovalRequestHash_StableAndContextSensitive pins the two properties the
|
||||
// gate relies on: the same operation hashes identically across calls, and a
|
||||
// different context yields a different hash (so an approval can't be replayed
|
||||
// onto a different target).
|
||||
func TestApprovalRequestHash_StableAndContextSensitive(t *testing.T) {
|
||||
a := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "A", "n": 1})
|
||||
aAgain := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"n": 1, "target": "A"})
|
||||
b := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "B", "n": 1})
|
||||
if a != aAgain {
|
||||
t.Errorf("hash not stable across equal contexts: %s vs %s", a, aAgain)
|
||||
}
|
||||
if a == b {
|
||||
t.Errorf("hash not context-sensitive: target A and B collided (%s)", a)
|
||||
}
|
||||
}
|
||||
@@ -173,20 +173,8 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
// check_task_status returned status='queued' forever even after a
|
||||
// real reply landed). messageId mirrors delegation_id so the
|
||||
// platform's idempotency-key extraction also keys off the same id.
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) —
|
||||
// a `type`-keyed Part is dropped by the receiver's v0.3
|
||||
// validator, silently losing the delegated task.
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": body.Task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
// Build A2A payload via helper so contract tests can assert the envelope shape.
|
||||
a2aBody, marshalErr := buildDelegateA2ABody(delegationID, body.Task)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
@@ -374,6 +362,27 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
|
||||
return insertTrackingUnavailable
|
||||
}
|
||||
|
||||
// buildDelegateA2ABody constructs the A2A JSON-RPC envelope for a delegation.
|
||||
// The returned shape is a schema-valid SendMessageRequest with role="user",
|
||||
// messageId, parts, and delegation metadata. Extracted to a pure function so
|
||||
// unit tests can assert the envelope contract without standing up HTTP or DB.
|
||||
func buildDelegateA2ABody(delegationID, task string) ([]byte, error) {
|
||||
return json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) —
|
||||
// a `type`-keyed Part is dropped by the receiver's v0.3
|
||||
// validator, silently losing the delegated task.
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// executeDelegation runs in a goroutine — sends A2A and stores the result.
|
||||
// Updates delegation status through: pending → dispatched → received → completed/failed
|
||||
// delegationRetryDelay is the pause between the first failed proxy attempt
|
||||
|
||||
@@ -1762,3 +1762,74 @@ func TestListDelegations_LedgerFailedIncludesErrorDetail(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- buildDelegateA2ABody: schema-valid SendMessageRequest ----------
|
||||
|
||||
// TestBuildDelegateA2ABody_SchemaValidSendMessageRequest pins the contract
|
||||
// requested by issue #2251: delegate_task must produce a schema-valid A2A
|
||||
// SendMessageRequest with role="user", messageId, parts, and metadata.
|
||||
func TestBuildDelegateA2ABody_SchemaValidSendMessageRequest(t *testing.T) {
|
||||
delegationID := "del-2251-test"
|
||||
task := "write a contract test"
|
||||
|
||||
body, err := buildDelegateA2ABody(delegationID, task)
|
||||
if err != nil {
|
||||
t.Fatalf("buildDelegateA2ABody failed: %v", err)
|
||||
}
|
||||
|
||||
var envelope map[string]interface{}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
t.Fatalf("body is not valid JSON: %v", err)
|
||||
}
|
||||
|
||||
// Top-level envelope shape
|
||||
if envelope["method"] != "message/send" {
|
||||
t.Errorf("method = %v, want message/send", envelope["method"])
|
||||
}
|
||||
|
||||
params, ok := envelope["params"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("params missing or not a map: %T", envelope["params"])
|
||||
}
|
||||
|
||||
msg, ok := params["message"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("message missing or not a map: %T", params["message"])
|
||||
}
|
||||
|
||||
// Issue #2251: role is required
|
||||
if msg["role"] != "user" {
|
||||
t.Errorf("message.role = %v, want \"user\"", msg["role"])
|
||||
}
|
||||
|
||||
// messageId must be present and match delegationID
|
||||
if msg["messageId"] != delegationID {
|
||||
t.Errorf("message.messageId = %v, want %s", msg["messageId"], delegationID)
|
||||
}
|
||||
|
||||
// parts must be a non-empty list with a text part
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) == 0 {
|
||||
t.Fatalf("message.parts missing or empty: %T", msg["parts"])
|
||||
}
|
||||
firstPart, ok := parts[0].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("first part is not a map: %T", parts[0])
|
||||
}
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251)
|
||||
if firstPart["kind"] != "text" {
|
||||
t.Errorf("first part kind = %v, want text", firstPart["kind"])
|
||||
}
|
||||
if firstPart["text"] != task {
|
||||
t.Errorf("first part text = %v, want %q", firstPart["text"], task)
|
||||
}
|
||||
|
||||
// metadata.delegation_id must match
|
||||
meta, ok := msg["metadata"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("metadata missing or not a map: %T", msg["metadata"])
|
||||
}
|
||||
if meta["delegation_id"] != delegationID {
|
||||
t.Errorf("metadata.delegation_id = %v, want %s", meta["delegation_id"], delegationID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,7 +337,7 @@ func TestRegister_ProvisionerURLPreserved(t *testing.T) {
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push").
|
||||
WithArgs("ws-prov", "ws-prov", "http://localhost:8000", `{"name":"agent"}`, "push", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// DB returns provisioner URL (127.0.0.1) — should take precedence over agent-reported URL
|
||||
|
||||
@@ -180,7 +180,7 @@ func TestRegisterHandler(t *testing.T) {
|
||||
|
||||
// Expect the upsert INSERT ... ON CONFLICT
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push").
|
||||
WithArgs("ws-123", "ws-123", "http://localhost:8000", `{"name":"test"}`, "push", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect the SELECT url query (for cache URL logic)
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// kind_platform_root_integration_test.go — REAL Postgres gate for the
|
||||
// platform-agent participant kind (RFC docs/design/rfc-platform-agent.md).
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_PlatformKind -v
|
||||
//
|
||||
// CI: piggybacks on the handlers-postgres-integration workflow (path filter
|
||||
// includes workspace-server/internal/handlers/** and migrations/**).
|
||||
//
|
||||
// Why this is NOT a sqlmock test
|
||||
// ------------------------------
|
||||
// The invariant "a platform agent must be the org root (parent_id IS NULL),
|
||||
// which structurally also means at most one platform agent per org" is enforced
|
||||
// by the workspaces_platform_root_check CHECK constraint in migration
|
||||
// 20260606000000_workspaces_kind. sqlmock cannot execute DDL or evaluate a CHECK
|
||||
// constraint, so only a real Postgres can prove the constraint actually rejects
|
||||
// a non-root platform agent and accepts a root one. The Register handler's
|
||||
// isPlatformRootViolation()/409 path depends on this constraint firing.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func integrationDB_PlatformKind(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
url := requireIntegrationDBURL(t)
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
return conn
|
||||
}
|
||||
|
||||
// TestIntegration_PlatformKind_RootAllowed_NonRootRejected proves the three
|
||||
// guarantees of the kind column against a real Postgres:
|
||||
//
|
||||
// 1. a fresh workspace defaults to kind='workspace';
|
||||
// 2. a root row (parent_id IS NULL) may be kind='platform';
|
||||
// 3. a non-root row (parent_id set) may NOT be kind='platform' — the
|
||||
// workspaces_platform_root_check constraint rejects it (23514).
|
||||
func TestIntegration_PlatformKind_RootAllowed_NonRootRejected(t *testing.T) {
|
||||
conn := integrationDB_PlatformKind(t)
|
||||
ctx := context.Background()
|
||||
|
||||
prefix := fmt.Sprintf("itest-kind-%s", uuid.New().String()[:8])
|
||||
cleanup := func() {
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`DELETE FROM workspaces WHERE name LIKE $1`, prefix+"%"); err != nil {
|
||||
t.Logf("cleanup (non-fatal): %v", err)
|
||||
}
|
||||
}
|
||||
t.Cleanup(cleanup)
|
||||
cleanup() // pre-test hygiene in the shared integration DB
|
||||
|
||||
rootID := uuid.New().String()
|
||||
childID := uuid.New().String()
|
||||
|
||||
// 1. Default kind is 'workspace' when the column is omitted on INSERT.
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status, parent_id)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'online', NULL)
|
||||
`, rootID, prefix+"-root"); err != nil {
|
||||
t.Fatalf("seed root: %v", err)
|
||||
}
|
||||
var gotKind string
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT kind FROM workspaces WHERE id = $1`, rootID).Scan(&gotKind); err != nil {
|
||||
t.Fatalf("read kind: %v", err)
|
||||
}
|
||||
if gotKind != "workspace" {
|
||||
t.Fatalf("default kind = %q, want \"workspace\"", gotKind)
|
||||
}
|
||||
|
||||
// 2. The root row may become a platform agent.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET kind = 'platform' WHERE id = $1`, rootID); err != nil {
|
||||
t.Fatalf("promote root to platform: unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// A child of the platform root (an ordinary workspace) inserts fine.
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status, parent_id)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'online', $3)
|
||||
`, childID, prefix+"-child", rootID); err != nil {
|
||||
t.Fatalf("seed child: %v", err)
|
||||
}
|
||||
|
||||
// 3. The non-root child may NOT be a platform agent — the CHECK rejects it.
|
||||
_, err := conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET kind = 'platform' WHERE id = $1`, childID)
|
||||
if err == nil {
|
||||
t.Fatalf("non-root child accepted kind='platform' — constraint did not fire")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "workspaces_platform_root_check") {
|
||||
t.Fatalf("non-root platform rejection wanted workspaces_platform_root_check, got: %v", err)
|
||||
}
|
||||
|
||||
// And the unknown-kind value is rejected by workspaces_kind_check.
|
||||
_, err = conn.ExecContext(ctx,
|
||||
`UPDATE workspaces SET kind = 'bogus' WHERE id = $1`, rootID)
|
||||
if err == nil || !strings.Contains(err.Error(), "workspaces_kind_check") {
|
||||
t.Fatalf("unknown kind wanted workspaces_kind_check rejection, got: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -54,6 +54,55 @@ func mcpPost(t *testing.T, h *MCPHandler, workspaceID string, body interface{})
|
||||
return w
|
||||
}
|
||||
|
||||
// assertA2ASendMessageSchema validates that body is a schema-valid A2A
|
||||
// SendMessageRequest with role="user", messageId, and non-empty parts.
|
||||
// Issue #2251 contract test: delegate_task must always produce this shape.
|
||||
func assertA2ASendMessageSchema(t *testing.T, body []byte, wantTask string) {
|
||||
t.Helper()
|
||||
var envelope map[string]interface{}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
t.Fatalf("A2A body is not valid JSON: %v", err)
|
||||
}
|
||||
if envelope["jsonrpc"] != "2.0" {
|
||||
t.Errorf("jsonrpc = %v, want 2.0", envelope["jsonrpc"])
|
||||
}
|
||||
if envelope["method"] != "message/send" {
|
||||
t.Errorf("method = %v, want message/send", envelope["method"])
|
||||
}
|
||||
|
||||
params, ok := envelope["params"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("params missing or not a map: %T", envelope["params"])
|
||||
}
|
||||
msg, ok := params["message"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("message missing or not a map: %T", params["message"])
|
||||
}
|
||||
|
||||
if msg["role"] != "user" {
|
||||
t.Errorf("message.role = %v, want \"user\"", msg["role"])
|
||||
}
|
||||
if msg["messageId"] == "" {
|
||||
t.Error("message.messageId is empty")
|
||||
}
|
||||
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) == 0 {
|
||||
t.Fatalf("message.parts missing or empty: %T", msg["parts"])
|
||||
}
|
||||
firstPart, ok := parts[0].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("first part is not a map: %T", parts[0])
|
||||
}
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251)
|
||||
if firstPart["kind"] != "text" {
|
||||
t.Errorf("first part kind = %v, want text", firstPart["kind"])
|
||||
}
|
||||
if firstPart["text"] != wantTask {
|
||||
t.Errorf("first part text = %v, want %q", firstPart["text"], wantTask)
|
||||
}
|
||||
}
|
||||
|
||||
func expectCanCommunicateSiblings(mock sqlmock.Sqlmock, callerID, targetID, parentID string) {
|
||||
mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(callerID).
|
||||
@@ -209,9 +258,7 @@ func TestMCPHandler_DelegateTask_RoutesThroughPlatformA2AProxy(t *testing.T) {
|
||||
if !logActivity {
|
||||
t.Fatal("delegate_task should log through platform A2A proxy")
|
||||
}
|
||||
if !strings.Contains(string(body), "do work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
assertA2ASendMessageSchema(t, body, "do work")
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
|
||||
}
|
||||
|
||||
@@ -252,9 +299,7 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
if !strings.Contains(string(body), "async work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
assertA2ASendMessageSchema(t, body, "async work")
|
||||
called <- struct{}{}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil
|
||||
}
|
||||
@@ -304,10 +349,8 @@ func TestMCPHandler_DelegateTask_WithAttachments(t *testing.T) {
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
assertA2ASendMessageSchema(t, body, "review this video")
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"text":"review this video"`) {
|
||||
t.Fatalf("A2A body missing task text: %s", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"kind":"video"`) {
|
||||
t.Fatalf("A2A body missing video attachment kind: %s", bodyStr)
|
||||
}
|
||||
@@ -386,6 +429,7 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
|
||||
waitGlobalAsyncForTest()
|
||||
select {
|
||||
case body := <-called:
|
||||
assertA2ASendMessageSchema(t, body, "async work with image")
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"kind":"image"`) {
|
||||
t.Fatalf("A2A body missing image attachment kind: %s", bodyStr)
|
||||
|
||||
@@ -161,7 +161,7 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
|
||||
// 1. Strip plugin's rule/fragment markers from CLAUDE.md (mirrors
|
||||
// AgentskillsAdaptor.uninstall lines 184-188). Best-effort: if
|
||||
// the user edited CLAUDE.md, our marker stays untouched.
|
||||
h.stripPluginMarkersFromMemory(ctx, containerName, pluginName)
|
||||
h.stripPluginMarkersFromMemory(ctx, workspaceID, containerName, pluginName)
|
||||
|
||||
// 2. Remove copied skill dirs declared in the plugin's plugin.yaml.
|
||||
for _, skill := range skillNames {
|
||||
@@ -171,9 +171,11 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
|
||||
log.Printf("Plugin uninstall: skipping invalid skill name %q in %s: %v", skill, pluginName, err)
|
||||
continue
|
||||
}
|
||||
_, _ = h.execAsRoot(ctx, containerName, []string{
|
||||
if _, rmErr := h.execAsRoot(ctx, containerName, []string{
|
||||
"rm", "-rf", "/configs/skills/" + skill,
|
||||
})
|
||||
}); rmErr != nil {
|
||||
log.Printf("Plugin uninstall: failed to remove skill %s from %s: %v", skill, workspaceID, rmErr)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Delete the plugin directory itself (as root to handle file ownership).
|
||||
|
||||
@@ -393,7 +393,7 @@ func (h *PluginsHandler) readPluginSkillsFromContainer(ctx context.Context, cont
|
||||
// `# Plugin: <name> /` — mirrors AgentskillsAdaptor.uninstall's stripping
|
||||
// logic so install/uninstall are symmetric. Best-effort: silent on read or
|
||||
// write failure, since the rest of uninstall must still succeed.
|
||||
func (h *PluginsHandler) stripPluginMarkersFromMemory(ctx context.Context, containerName, pluginName string) {
|
||||
func (h *PluginsHandler) stripPluginMarkersFromMemory(ctx context.Context, workspaceID, containerName, pluginName string) {
|
||||
// Use sed via bash -c for atomic in-place delete: drop the marker line
|
||||
// and the blank line that follows it (install adds a leading blank line
|
||||
// before the marker via append_to_memory). Three sed passes mirror the
|
||||
@@ -417,7 +417,9 @@ func (h *PluginsHandler) stripPluginMarkersFromMemory(ctx context.Context, conta
|
||||
`awk 'BEGIN{skip=0; blanks=0} /^%s/{skip=1; blanks=0; next} skip==1 && /^[[:space:]]*$/{blanks++; if(blanks>=2){skip=0; print; next} next} /^# Plugin: /{if(skip==1)skip=0} skip==1{next} {print}' /configs/CLAUDE.md > /tmp/claude.new && mv /tmp/claude.new /configs/CLAUDE.md`,
|
||||
regexpEscapeForAwk(marker),
|
||||
)
|
||||
_, _ = h.execAsRoot(ctx, containerName, []string{"bash", "-c", script})
|
||||
if _, awkErr := h.execAsRoot(ctx, containerName, []string{"bash", "-c", script}); awkErr != nil {
|
||||
log.Printf("Plugin uninstall: failed to strip markers from CLAUDE.md for %s in %s: %v", pluginName, workspaceID, awkErr)
|
||||
}
|
||||
}
|
||||
|
||||
// regexpEscapeForAwk escapes characters that have special meaning inside an
|
||||
|
||||
@@ -164,6 +164,20 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
|
||||
// errPlatformNotRoot is the client-facing message when a register call tried to
|
||||
// mark a non-root workspace as a platform agent.
|
||||
const errPlatformNotRoot = "a platform agent must be the org root (parent_id must be null)"
|
||||
|
||||
// isPlatformRootViolation reports whether err is the DB rejecting a register
|
||||
// that tried to mark a non-root workspace as a platform agent (the
|
||||
// workspaces_platform_root_check CHECK constraint). The handler maps it to a
|
||||
// friendly HTTP 409 instead of a raw 500. The invariant — platform == org root,
|
||||
// which structurally also guarantees one platform agent per org — is enforced
|
||||
// race-proof at the DB level; this is just the friendly surface.
|
||||
func isPlatformRootViolation(err error) bool {
|
||||
return err != nil && strings.Contains(err.Error(), "workspaces_platform_root_check")
|
||||
}
|
||||
|
||||
// Returns a non-nil error suitable for including in a 400 Bad Request response.
|
||||
func validateAgentURL(rawURL string) error {
|
||||
if rawURL == "" {
|
||||
@@ -277,6 +291,14 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Validate explicit kind if the agent declared one; empty is allowed and
|
||||
// resolves to the row's existing value (or "workspace" default) in
|
||||
// resolveKind below. Only the platform-agent container declares 'platform'.
|
||||
if payload.Kind != "" && !models.IsValidKind(payload.Kind) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'workspace' or 'platform'"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// C18: prevent workspace URL hijacking on re-registration.
|
||||
@@ -390,9 +412,15 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
// the row. Without this guard, bulk deletes left tier-3 stragglers because
|
||||
// the last pre-teardown heartbeat flipped status back to 'online' after
|
||||
// Delete's UPDATE.
|
||||
// kind ($6) is the raw payload value (validated above; "" = unspecified).
|
||||
// COALESCE(NULLIF($6,''), …) means: an explicit kind wins; an unspecified
|
||||
// kind defaults to 'workspace' for a NEW row and KEEPS the existing kind on
|
||||
// re-register (so a platform agent re-registering without kind is never
|
||||
// downgraded). A non-root row asking for 'platform' is rejected by the
|
||||
// workspaces_platform_root_check constraint → friendly 409 below.
|
||||
_, err = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5)
|
||||
INSERT INTO workspaces (id, name, url, agent_card, status, last_heartbeat_at, delivery_mode, kind)
|
||||
VALUES ($1, $2, $3, $4::jsonb, 'online', now(), $5, COALESCE(NULLIF($6, ''), 'workspace'))
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
url = CASE
|
||||
WHEN workspaces.url LIKE 'http://127.0.0.1%' THEN workspaces.url
|
||||
@@ -402,10 +430,15 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
status = 'online',
|
||||
last_heartbeat_at = now(),
|
||||
delivery_mode = EXCLUDED.delivery_mode,
|
||||
kind = COALESCE(NULLIF($6, ''), workspaces.kind),
|
||||
updated_at = now()
|
||||
WHERE workspaces.status IS DISTINCT FROM 'removed'
|
||||
`, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert)
|
||||
`, payload.ID, payload.ID, urlForUpsert, agentCardStr, modeForUpsert, payload.Kind)
|
||||
if err != nil {
|
||||
if isPlatformRootViolation(err) {
|
||||
c.JSON(http.StatusConflict, gin.H{"error": errPlatformNotRoot})
|
||||
return
|
||||
}
|
||||
log.Printf("Registry register error: %v (id=%s)", err, payload.ID)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"})
|
||||
return
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestRegister_DBError(t *testing.T) {
|
||||
|
||||
// DB insert fails
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push").
|
||||
WithArgs("ws-fail", "ws-fail", "http://localhost:8000", `{"name":"test"}`, "push", "").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@@ -647,7 +647,7 @@ func TestRegister_GuardAgainstResurrectingRemovedRow(t *testing.T) {
|
||||
// This regex-ish match requires the guard. If the handler ever drops
|
||||
// the clause the test fails because the emitted SQL won't match.
|
||||
mock.ExpectExec("ON CONFLICT.*WHERE workspaces.status IS DISTINCT FROM 'removed'").
|
||||
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push").
|
||||
WithArgs("ws-resurrect", "ws-resurrect", "http://localhost:8000", `{"name":"x"}`, "push", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows affected = correctly guarded
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs("ws-resurrect").
|
||||
@@ -917,7 +917,7 @@ func TestRegister_C18_BootstrapAllowedNoTokens(t *testing.T) {
|
||||
|
||||
// Workspace upsert proceeds normally.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push").
|
||||
WithArgs("ws-new", "ws-new", "http://localhost:9100", `{"name":"new-agent"}`, "push", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
@@ -1228,7 +1228,7 @@ func TestRegister_DBErrorResponseIsOpaque(t *testing.T) {
|
||||
|
||||
// DB upsert fails with a descriptive internal error.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push").
|
||||
WithArgs("ws-errtest", "ws-errtest", "http://localhost:9200", `{"name":"err-agent"}`, "push", "").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@@ -1476,7 +1476,7 @@ func TestRegister_PollMode_AcceptsEmptyURL(t *testing.T) {
|
||||
|
||||
// Upsert MUST run with empty URL (sql.NullString) and delivery_mode=poll.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"poll-agent"}`, "poll", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// SELECT url for cache: returns NULL/empty for poll-mode rows. The
|
||||
@@ -1591,6 +1591,89 @@ func TestRegister_InvalidDeliveryMode(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_InvalidKind rejects payloads that declare an unrecognised kind —
|
||||
// only 'workspace' and 'platform' are valid. Mirrors the delivery_mode guard;
|
||||
// the rejection happens before any DB access.
|
||||
func TestRegister_InvalidKind(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"ws-x","url":"http://localhost:8000","agent_card":{"name":"a"},"kind":"bogus"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("invalid kind: expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "kind") {
|
||||
t.Errorf("expected error body to mention kind, got: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_PlatformKind_PersistsKind verifies that a workspace registering
|
||||
// with kind="platform" has that value written through the upsert (the platform
|
||||
// agent self-registers as the org root). The platform==root invariant itself is
|
||||
// enforced by the workspaces_platform_root_check DB constraint and exercised by
|
||||
// the integration test, which sqlmock cannot enforce.
|
||||
func TestRegister_PlatformKind_PersistsKind(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
const wsID = "ws-platform-agent"
|
||||
|
||||
// Bootstrap path — no live tokens.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// delivery_mode="push" is set explicitly, so resolveDeliveryMode
|
||||
// short-circuits (no SELECT delivery_mode lookup). The upsert MUST carry
|
||||
// kind="platform" as the 6th arg.
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, "http://localhost:9100", `{"name":"concierge"}`, "push", "platform").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("http://localhost:9100"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Token issuance — first-register path.
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"`+wsID+`","url":"http://localhost:9100","delivery_mode":"push","kind":"platform","agent_card":{"name":"concierge"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("platform register: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_PollMode_PreservesExistingValue: when the row already
|
||||
// has delivery_mode=poll and the payload doesn't set it, the resolved
|
||||
// mode should be poll — i.e. "absent payload mode" must NOT silently
|
||||
@@ -1616,7 +1699,7 @@ func TestRegister_PollMode_PreservesExistingValue(t *testing.T) {
|
||||
// Upsert carries the resolved poll mode forward — even though
|
||||
// payload didn't restate it. URL still empty (poll-mode shape).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@@ -1685,7 +1768,7 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
|
||||
AddRow(sql.NullString{}, "external"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@@ -1744,7 +1827,7 @@ func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) {
|
||||
AddRow(sql.NullString{}, "kimi-cli"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
@@ -1804,7 +1887,7 @@ func TestRegister_NonExternalRuntime_StillDefaultsToPush(t *testing.T) {
|
||||
AddRow(sql.NullString{}, "claude-code"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, "http://localhost:8000", `{"name":"a"}`, "push").
|
||||
WithArgs(wsID, wsID, "http://localhost:8000", `{"name":"a"}`, "push", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -113,6 +114,11 @@ func (h *WorkspaceHandler) goAsync(fn func()) {
|
||||
h.asyncWG.Add(1)
|
||||
go func() {
|
||||
defer h.asyncWG.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in goAsync goroutine: %v\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
@@ -151,6 +157,11 @@ func globalGoAsync(fn func()) {
|
||||
globalAsync.Add(1)
|
||||
go func() {
|
||||
defer globalAsync.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in globalGoAsync goroutine: %v\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -13,11 +13,16 @@ import (
|
||||
const DefaultMaxConcurrentTasks = 1
|
||||
|
||||
type Workspace struct {
|
||||
ID string `json:"id" db:"id"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Role sql.NullString `json:"role" db:"role"`
|
||||
Tier int `json:"tier" db:"tier"`
|
||||
Status string `json:"status" db:"status"`
|
||||
ID string `json:"id" db:"id"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Role sql.NullString `json:"role" db:"role"`
|
||||
Tier int `json:"tier" db:"tier"`
|
||||
Status string `json:"status" db:"status"`
|
||||
// Kind: "workspace" (default) or "platform". A "platform" workspace is the
|
||||
// org-level concierge (the platform agent) that sits at the org root and is
|
||||
// the user's default A2A target. See migration
|
||||
// 20260606000000_workspaces_kind + RFC docs/design/rfc-platform-agent.md.
|
||||
Kind string `json:"kind" db:"kind"`
|
||||
SourceBundleID sql.NullString `json:"source_bundle_id" db:"source_bundle_id"`
|
||||
AgentCard json.RawMessage `json:"agent_card" db:"agent_card"`
|
||||
URL sql.NullString `json:"url" db:"url"`
|
||||
@@ -63,6 +68,21 @@ func IsValidDeliveryMode(s string) bool {
|
||||
return s == DeliveryModePush || s == DeliveryModePoll
|
||||
}
|
||||
|
||||
// Workspace kind constants. Matches the CHECK constraint in migration
|
||||
// 20260606000000_workspaces_kind. KindPlatform marks the org-level concierge
|
||||
// (the platform agent) which sits at the org root; see
|
||||
// docs/design/rfc-platform-agent.md.
|
||||
const (
|
||||
KindWorkspace = "workspace"
|
||||
KindPlatform = "platform"
|
||||
)
|
||||
|
||||
// IsValidKind reports whether s is a recognised workspace kind. Empty string is
|
||||
// NOT valid here — callers resolve the default (KindWorkspace) before calling.
|
||||
func IsValidKind(s string) bool {
|
||||
return s == KindWorkspace || s == KindPlatform
|
||||
}
|
||||
|
||||
type RegisterPayload struct {
|
||||
ID string `json:"id" binding:"required"`
|
||||
// URL is required for push-mode workspaces; optional / unused for
|
||||
@@ -76,6 +96,12 @@ type RegisterPayload struct {
|
||||
// value on the workspace row, or default to push for new rows".
|
||||
// When set, must be one of DeliveryModePush / DeliveryModePoll.
|
||||
DeliveryMode string `json:"delivery_mode,omitempty"`
|
||||
// Kind is optional. Empty string means "keep the existing value on the
|
||||
// workspace row, or default to KindWorkspace for new rows". When set, must
|
||||
// be one of KindWorkspace / KindPlatform. KindPlatform additionally requires
|
||||
// the row to be its own org root (parent_id IS NULL) and to be the only
|
||||
// platform agent in the org — enforced by the Register handler.
|
||||
Kind string `json:"kind,omitempty"`
|
||||
}
|
||||
|
||||
type HeartbeatPayload struct {
|
||||
|
||||
@@ -34,6 +34,35 @@ func TestIsValidDeliveryMode_Invalid(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== IsValidKind ====================
|
||||
|
||||
func TestIsValidKind_Valid(t *testing.T) {
|
||||
for _, k := range []string{KindWorkspace, KindPlatform} {
|
||||
if !IsValidKind(k) {
|
||||
t.Errorf("IsValidKind(%q) = false, want true", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsValidKind_Invalid(t *testing.T) {
|
||||
cases := []struct {
|
||||
val string
|
||||
want bool
|
||||
}{
|
||||
{"", false}, // empty is not valid — callers resolve the default
|
||||
{"platforms", false}, // typo
|
||||
{"Platform", false}, // case-sensitive
|
||||
{"platform ", false}, // trailing space
|
||||
{"root", false}, // not a kind
|
||||
{"user", false}, // the user is implicit, not a workspace kind
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := IsValidKind(tc.val); got != tc.want {
|
||||
t.Errorf("IsValidKind(%q) = %v, want %v", tc.val, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== WorkspaceStatus ====================
|
||||
|
||||
func TestWorkspaceStatus_String(t *testing.T) {
|
||||
|
||||
@@ -199,6 +199,11 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
// entry/exit — those are kept as redundant signals but this pulse is the
|
||||
// one that guarantees liveness freshness regardless of tick state.
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in scheduler heartbeat goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
pulseTicker := time.NewTicker(10 * time.Second)
|
||||
defer pulseTicker.Stop()
|
||||
for {
|
||||
@@ -638,6 +643,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
summary := s.extractResponseSummary(respBody)
|
||||
if summary != "" {
|
||||
go func(wsID, text string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in broadcast summary goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer postCancel()
|
||||
s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text)
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
-- Reverse the participant-kind discriminator.
|
||||
-- Non-destructive: dropping the column makes every workspace an ordinary
|
||||
-- workspace again (the platform agent loses its marker but its row survives).
|
||||
DROP INDEX IF EXISTS idx_workspaces_kind;
|
||||
ALTER TABLE workspaces DROP CONSTRAINT IF EXISTS workspaces_platform_root_check;
|
||||
ALTER TABLE workspaces DROP CONSTRAINT IF EXISTS workspaces_kind_check;
|
||||
ALTER TABLE workspaces DROP COLUMN IF EXISTS kind;
|
||||
@@ -0,0 +1,45 @@
|
||||
-- Participant-kind discriminator for the org-level platform agent.
|
||||
-- (RFC: docs/design/rfc-platform-agent.md)
|
||||
--
|
||||
-- 'workspace' (default) = an ordinary workspace / agent.
|
||||
-- 'platform' = the org-level concierge (the "platform agent"). It is
|
||||
-- the single org root (parent_id IS NULL) and the user's
|
||||
-- default A2A chat target. Exactly one per org.
|
||||
--
|
||||
-- There is no org_id column — an "org" is the parent_id-chain root resolved by
|
||||
-- org_scope.go (orgRootID/sameOrg). "platform == org root" and "one platform
|
||||
-- agent per org" are therefore enforced in the Register/create handlers, not in
|
||||
-- pure SQL. This column is only the discriminator (default-target / billing
|
||||
-- exclusion / UX), defined once here and mirrored by the Go constants
|
||||
-- models.KindWorkspace / models.KindPlatform.
|
||||
--
|
||||
-- Backward-compatible: every existing row defaults to 'workspace'. The CHECK is
|
||||
-- added NOT VALID then validated so the ALTER can never fail on legacy data.
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS kind TEXT NOT NULL DEFAULT 'workspace';
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'workspaces_kind_check') THEN
|
||||
ALTER TABLE workspaces
|
||||
ADD CONSTRAINT workspaces_kind_check CHECK (kind IN ('workspace', 'platform')) NOT VALID;
|
||||
ALTER TABLE workspaces VALIDATE CONSTRAINT workspaces_kind_check;
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
-- platform == org root, enforced at the DB level (race-proof). A platform agent
|
||||
-- MUST have parent_id IS NULL. Because an org is the subtree under a single
|
||||
-- parent_id IS NULL root (org_scope.go) and only a root may be 'platform', this
|
||||
-- also structurally guarantees at most ONE platform agent per org. The handler
|
||||
-- additionally pre-checks this to return a friendly 409 instead of a raw 23514.
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'workspaces_platform_root_check') THEN
|
||||
ALTER TABLE workspaces
|
||||
ADD CONSTRAINT workspaces_platform_root_check
|
||||
CHECK (kind <> 'platform' OR parent_id IS NULL) NOT VALID;
|
||||
ALTER TABLE workspaces VALIDATE CONSTRAINT workspaces_platform_root_check;
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workspaces_kind ON workspaces(kind);
|
||||
@@ -0,0 +1,5 @@
|
||||
-- Reverse the approval-gate single-use/dedup columns.
|
||||
DROP INDEX IF EXISTS approval_requests_gate_idx;
|
||||
ALTER TABLE approval_requests
|
||||
DROP COLUMN IF EXISTS request_hash,
|
||||
DROP COLUMN IF EXISTS consumed_at;
|
||||
@@ -0,0 +1,18 @@
|
||||
-- Single-use + dedup support for the destructive-op approval gate.
|
||||
-- (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
--
|
||||
-- consumed_at: an approval is single-use. Once a destructive op consumes an
|
||||
-- approved request, consumed_at is stamped so the same approval can't be
|
||||
-- replayed for a second destructive call.
|
||||
-- request_hash: a stable hash of (workspace_id, action, context) so a repeated
|
||||
-- destructive attempt matches its own pending/approved request instead of
|
||||
-- flooding the table with duplicates.
|
||||
ALTER TABLE approval_requests
|
||||
ADD COLUMN IF NOT EXISTS consumed_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS request_hash TEXT;
|
||||
|
||||
-- Hot path: the gate looks up an approved + unconsumed row matching
|
||||
-- (workspace_id, action, request_hash). Partial index keeps that O(log live).
|
||||
CREATE INDEX IF NOT EXISTS approval_requests_gate_idx
|
||||
ON approval_requests (workspace_id, action, request_hash)
|
||||
WHERE status = 'approved' AND consumed_at IS NULL;
|
||||
Reference in New Issue
Block a user