Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3e5a215028 | |||
| 76609f4129 | |||
| 8439a066b6 | |||
| d7d376118d | |||
| 026d1c5fae | |||
| 48ad38e795 | |||
| 4bdb10b5e2 | |||
| 6452456f75 | |||
| 4978601032 | |||
| ec3e27a4ec | |||
| 4cc0e32a53 | |||
| e9693e12ff | |||
| bcca139caa | |||
| 6cf6e608d8 | |||
| 6947774e1b | |||
| 9afecfdfc7 | |||
| 220ee57d0c | |||
| 2751861b04 | |||
| da416caeca | |||
| 250af4df36 | |||
| 884bb8c09f | |||
| 0c152a24d2 | |||
| 3345544921 | |||
| 8e2597c877 | |||
| d241dd7f9e | |||
| d437c31da4 |
@@ -203,17 +203,12 @@ def ci_jobs_all(ci_doc: dict) -> set[str]:
|
||||
|
||||
def ci_job_names(ci_doc: dict) -> set[str]:
|
||||
"""Set of job keys in ci.yml MINUS the sentinel itself MINUS jobs
|
||||
whose `if:` gates on `github.event_name` or `github.ref` (those are
|
||||
event-scoped and can legitimately be `skipped` for a given trigger;
|
||||
if we required them under the sentinel `needs:`, every PR-only job
|
||||
whose `if:` gates on `github.event_name` (those are event-scoped
|
||||
and can legitimately be `skipped` for a given trigger; if we
|
||||
required them under the sentinel `needs:`, every PR-only job
|
||||
would be `skipped` on push and the sentinel would interpret
|
||||
`skipped != success` as failure). RFC §4 spec.
|
||||
|
||||
`github.ref` is the companion gate for jobs that run only on direct
|
||||
pushes to specific branches (e.g. `github.ref == 'refs/heads/main'`).
|
||||
These never execute in a PR context, so flagging them as missing
|
||||
from `all-required.needs:` is a false positive (mc#958 / mc#959).
|
||||
|
||||
Used for F1 (jobs missing from sentinel needs). NOT used for F1b
|
||||
(typos in needs) — see `ci_jobs_all` for that."""
|
||||
jobs = ci_doc.get("jobs")
|
||||
@@ -226,9 +221,7 @@ def ci_job_names(ci_doc: dict) -> set[str]:
|
||||
continue
|
||||
if isinstance(v, dict):
|
||||
gate = v.get("if")
|
||||
if isinstance(gate, str) and (
|
||||
"github.event_name" in gate or "github.ref" in gate
|
||||
):
|
||||
if isinstance(gate, str) and "github.event_name" in gate:
|
||||
continue
|
||||
names.add(k)
|
||||
return names
|
||||
|
||||
@@ -417,21 +417,7 @@ def main() -> int:
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
args = parser.parse_args()
|
||||
_require_runtime_env()
|
||||
try:
|
||||
return process_once(dry_run=args.dry_run)
|
||||
except ApiError as exc:
|
||||
# API errors (401/403/404/500) are transient for a queue tick —
|
||||
# log and exit 0 so the workflow is not marked failed and the next
|
||||
# tick can retry. Returning non-zero would permanently fail the
|
||||
# workflow run, blocking future ticks.
|
||||
sys.stderr.write(f"::error::queue API error: {exc}\n")
|
||||
return 0
|
||||
except urllib.error.URLError as exc:
|
||||
sys.stderr.write(f"::error::queue network error: {exc}\n")
|
||||
return 0
|
||||
except TimeoutError as exc:
|
||||
sys.stderr.write(f"::error::queue timeout: {exc}\n")
|
||||
return 0
|
||||
return process_once(dry_run=args.dry_run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Executable → Regular
+39
-180
@@ -109,58 +109,59 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
|
||||
# Optional trailing note after the slug for /sop-ack and required reason
|
||||
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not
|
||||
# yet validated; future iteration may require a min-length).
|
||||
#
|
||||
# /sop-n/a <gate> [reason] — declares a gate as not-applicable.
|
||||
# <gate> is a canonical gate name (qa-review, security-review).
|
||||
# The declaring user must be in one of the gate's required_teams.
|
||||
# Most-recent per-user declaration wins (revoke semantics mirror ack).
|
||||
_DIRECTIVE_RE = re.compile(
|
||||
r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
|
||||
re.MULTILINE,
|
||||
)
|
||||
_NA_DIRECTIVE_RE = re.compile(
|
||||
r"^[ \t]*/sop-n/?a[ \t]+([A-Za-z0-9_\-]+)(?:[ \t]+(.*))?[ \t]*$",
|
||||
re.MULTILINE,
|
||||
)
|
||||
|
||||
|
||||
def parse_directives(
|
||||
comment_body: str,
|
||||
numeric_aliases: dict[int, str],
|
||||
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
|
||||
"""Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
|
||||
) -> tuple[list[tuple[str, str, str]], list]:
|
||||
"""Extract /sop-ack and /sop-revoke directives from a comment body.
|
||||
|
||||
Returns a tuple of two lists:
|
||||
0. list of (kind, canonical_slug, note) for sop-ack/sop-revoke
|
||||
1. list of (kind, gate_name, reason) for sop-n/a
|
||||
|
||||
canonical_slug is the normalized form (or "" if unparseable).
|
||||
note/reason is the trailing free-text (may be "").
|
||||
Returns (directives, na_directives) where:
|
||||
directives is a list of (kind, canonical_slug, note) tuples
|
||||
kind is "sop-ack" or "sop-revoke"
|
||||
canonical_slug is the normalized form (or "" if unparseable)
|
||||
note is the trailing free-text (may be "")
|
||||
na_directives is reserved for future N/A handling (always [] for now)
|
||||
"""
|
||||
out: list[tuple[str, str, str]] = []
|
||||
na_out: list[tuple[str, str, str]] = []
|
||||
if not comment_body:
|
||||
return out, na_out
|
||||
return out, []
|
||||
for m in _DIRECTIVE_RE.finditer(comment_body):
|
||||
kind = m.group(1)
|
||||
raw_slug = (m.group(2) or "").strip()
|
||||
# If the raw match included trailing words, the regex non-greedy
|
||||
# captured only the first token; strip again for safety.
|
||||
# We split on whitespace to keep the FIRST word as the slug, and
|
||||
# everything after as the note.
|
||||
parts = raw_slug.split()
|
||||
if not parts:
|
||||
continue
|
||||
first = parts[0]
|
||||
# If the slug-capture greedily matched multiple words (e.g.
|
||||
# "comprehensive testing"), preserve normalize behavior: join
|
||||
# the WHOLE first-word-token only; trailing words get appended to
|
||||
# the note. The regex limits group(2) to [A-Za-z0-9_\- ] so we
|
||||
# may have multi-word forms here — normalize handles them.
|
||||
if len(parts) > 1:
|
||||
# User wrote "/sop-ack comprehensive testing extra-note"
|
||||
# → treat "comprehensive testing" as the slug source if it
|
||||
# normalizes to a known item; otherwise treat "comprehensive"
|
||||
# as slug and "testing extra-note" as note. We defer the
|
||||
# disambiguation to the caller via the returned canonical
|
||||
# slug. For simplicity: try the WHOLE captured string first.
|
||||
canonical = normalize_slug(raw_slug, numeric_aliases)
|
||||
else:
|
||||
canonical = normalize_slug(first, numeric_aliases)
|
||||
note_from_group = (m.group(3) or "").strip()
|
||||
# If we collapsed multi-word slug into kebab and there's a
|
||||
# trailing-text group too, append it.
|
||||
out.append((kind, canonical, note_from_group))
|
||||
|
||||
for m in _NA_DIRECTIVE_RE.finditer(comment_body):
|
||||
gate = (m.group(1) or "").strip().lower()
|
||||
reason = (m.group(2) or "").strip()
|
||||
na_out.append(("sop-n/a", gate, reason))
|
||||
|
||||
return out, na_out
|
||||
return out, []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -231,8 +232,9 @@ def compute_ack_state(
|
||||
{
|
||||
"comprehensive-testing": {
|
||||
"ackers": ["bob"], # non-author, team-verified
|
||||
"rejected": {
|
||||
"rejected_ackers": { # debugging info
|
||||
"self_ack": ["alice"],
|
||||
"unknown_slug": [],
|
||||
"not_in_team": ["eve"],
|
||||
}
|
||||
},
|
||||
@@ -249,7 +251,7 @@ def compute_ack_state(
|
||||
user = (c.get("user") or {}).get("login", "")
|
||||
if not user:
|
||||
continue
|
||||
directives, _na_directives = parse_directives(body, numeric_aliases)
|
||||
directives, _na = parse_directives(body, numeric_aliases)
|
||||
for kind, slug, _note in directives:
|
||||
if not slug:
|
||||
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
|
||||
@@ -260,19 +262,25 @@ def compute_ack_state(
|
||||
# Filter out self-acks and unknown slugs.
|
||||
ackers_per_slug: dict[str, list[str]] = {s: [] for s in items_by_slug}
|
||||
rejected_self: dict[str, list[str]] = {s: [] for s in items_by_slug}
|
||||
rejected_unknown: dict[str, list[str]] = {s: [] for s in items_by_slug}
|
||||
pending_team_check: dict[str, list[str]] = {s: [] for s in items_by_slug}
|
||||
|
||||
for (user, slug), kind in latest_directive.items():
|
||||
if kind != "sop-ack":
|
||||
continue # revokes leave the (user,slug) state as "no ack"
|
||||
if slug not in items_by_slug:
|
||||
# Slug normalized to something not in our config — store
|
||||
# under a synthetic key for diagnostic surfacing. Don't add
|
||||
# to any item.
|
||||
continue
|
||||
if user == pr_author:
|
||||
rejected_self[slug].append(user)
|
||||
continue
|
||||
pending_team_check[slug].append(user)
|
||||
|
||||
# Step 3: team membership probe per slug.
|
||||
# Step 3: team membership probe per slug (batched per slug to keep
|
||||
# API call count down — same user may ack multiple items but the
|
||||
# required_teams differ per item, so we MUST probe per (user, item)).
|
||||
rejected_not_in_team: dict[str, list[str]] = {s: [] for s in items_by_slug}
|
||||
for slug, candidates in pending_team_check.items():
|
||||
if not candidates:
|
||||
@@ -281,6 +289,7 @@ def compute_ack_state(
|
||||
approved = team_membership_probe(slug, candidates) # returns subset
|
||||
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
|
||||
ackers_per_slug[slug] = approved
|
||||
# Stash required teams for description rendering.
|
||||
items_by_slug[slug]["_required_resolved"] = required
|
||||
|
||||
return {
|
||||
@@ -295,113 +304,6 @@ def compute_ack_state(
|
||||
}
|
||||
|
||||
|
||||
def compute_na_state(
|
||||
comments: list[dict[str, Any]],
|
||||
pr_author: str,
|
||||
na_gates: dict[str, dict[str, Any]],
|
||||
numeric_aliases: dict[int, str],
|
||||
team_membership_probe: "callable[[str, list[str]], list[str]]",
|
||||
client: "GiteaClient",
|
||||
org: str,
|
||||
) -> dict[str, dict[str, Any]]:
|
||||
"""Compute per-gate N/A declaration state.
|
||||
|
||||
Returns a dict keyed by gate name:
|
||||
{
|
||||
"qa-review": {
|
||||
"declared": ["alice"], # non-author, team-verified, not revoked
|
||||
"rejected": ["eve (not-in-team)", "bob (self-decl)"],
|
||||
"reason": "pure-infra change — no qa surface",
|
||||
},
|
||||
...
|
||||
}
|
||||
A gate is N/A-satisfied when at least one declaration from a valid
|
||||
team member exists and has not been revoked by the same user.
|
||||
"""
|
||||
if not na_gates:
|
||||
return {}
|
||||
|
||||
# Collapse directives per (commenter, gate) — most recent wins.
|
||||
latest_na: dict[tuple[str, str], str] = {} # (user, gate) → "sop-n/a"
|
||||
latest_na_reason: dict[tuple[str, str], str] = {} # (user, gate) → reason
|
||||
for c in comments:
|
||||
body = c.get("body", "") or ""
|
||||
user = (c.get("user") or {}).get("login", "")
|
||||
if not user:
|
||||
continue
|
||||
_directives, na_directives = parse_directives(body, numeric_aliases)
|
||||
for _kind, gate, reason in na_directives:
|
||||
if gate not in na_gates:
|
||||
continue
|
||||
latest_na[(user, gate)] = "sop-n/a"
|
||||
latest_na_reason[(user, gate)] = reason
|
||||
|
||||
# Determine candidate declarers per gate.
|
||||
na_state: dict[str, dict[str, Any]] = {
|
||||
gate: {"declared": [], "rejected": [], "reason": ""}
|
||||
for gate in na_gates
|
||||
}
|
||||
pending_per_gate: dict[str, list[str]] = {gate: [] for gate in na_gates}
|
||||
|
||||
for (user, gate), kind in latest_na.items():
|
||||
if kind != "sop-n/a":
|
||||
continue
|
||||
if user == pr_author:
|
||||
na_state[gate]["rejected"].append(f"{user} (self-decl)")
|
||||
continue
|
||||
pending_per_gate[gate].append(user)
|
||||
|
||||
# Probe team membership per gate using that gate's required_teams.
|
||||
for gate, candidates in pending_per_gate.items():
|
||||
if not candidates:
|
||||
continue
|
||||
required_teams = na_gates[gate].get("required_teams", [])
|
||||
# Resolve team names → ids using the client's resolver.
|
||||
team_ids: list[int] = []
|
||||
for tn in required_teams:
|
||||
tid = client.resolve_team_id(org, tn)
|
||||
if tid is not None:
|
||||
team_ids.append(tid)
|
||||
if not team_ids:
|
||||
na_state[gate]["rejected"].extend(
|
||||
f"{u} (no-team-id)" for u in candidates
|
||||
)
|
||||
continue
|
||||
for u in candidates:
|
||||
in_any_team = False
|
||||
for tid in team_ids:
|
||||
result = client.is_team_member(tid, u)
|
||||
if result is True:
|
||||
in_any_team = True
|
||||
break
|
||||
if result is None:
|
||||
# 403 — token owner not in team. Fail-closed.
|
||||
print(
|
||||
f"::warning::na: team-probe for {u} in team-id {tid} "
|
||||
"returned 403 — treating as not-in-team (fail-closed)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if in_any_team:
|
||||
na_state[gate]["declared"].append(u)
|
||||
else:
|
||||
na_state[gate]["rejected"].append(f"{u} (not-in-team)")
|
||||
|
||||
# Build per-gate reason string from declared users.
|
||||
for gate in na_gates:
|
||||
decl = na_state[gate]["declared"]
|
||||
if decl:
|
||||
reasons: list[str] = []
|
||||
for u in decl:
|
||||
r = latest_na_reason.get((u, gate), "")
|
||||
if r:
|
||||
reasons.append(f"{u}: {r}")
|
||||
else:
|
||||
reasons.append(u)
|
||||
na_state[gate]["reason"] = "; ".join(reasons)
|
||||
|
||||
return na_state
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea API client
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -799,7 +701,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
numeric_aliases = {
|
||||
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
|
||||
}
|
||||
na_gates: dict[str, dict[str, Any]] = cfg.get("n/a_gates") or {}
|
||||
|
||||
client = GiteaClient(args.gitea_host, token) if token else None
|
||||
if not client:
|
||||
@@ -819,8 +720,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
print("::error::PR payload missing user.login or head.sha", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
|
||||
|
||||
comments = client.get_issue_comments(args.owner, args.repo, args.pr)
|
||||
|
||||
# Build team-membership probe closure that caches results per
|
||||
@@ -878,47 +777,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe)
|
||||
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
|
||||
|
||||
# --- N/A gate state (RFC#324 §N/A follow-up) ---
|
||||
na_state: dict[str, dict[str, Any]] = {}
|
||||
if na_gates:
|
||||
na_state = compute_na_state(
|
||||
comments, author, na_gates, numeric_aliases,
|
||||
probe, client, args.owner,
|
||||
)
|
||||
# Post N/A declarations status (read by review-check.sh).
|
||||
na_satisfied = [g for g, s in na_state.items() if s["declared"]]
|
||||
na_missing = [g for g, s in na_state.items() if not s["declared"]]
|
||||
if na_satisfied:
|
||||
na_desc = f"N/A: {', '.join(na_satisfied)}"
|
||||
na_post_state = "success"
|
||||
elif na_missing:
|
||||
na_desc = f"awaiting /sop-n/a declaration for: {', '.join(na_missing)}"
|
||||
na_post_state = "pending"
|
||||
else:
|
||||
# Configured but no declarations yet.
|
||||
na_desc = "no /sop-n/a declarations yet"
|
||||
na_post_state = "pending"
|
||||
na_context = "sop-checklist / na-declarations (pull_request)"
|
||||
print(f"::notice::na-declarations status: {na_post_state} — {na_desc}")
|
||||
if not args.dry_run:
|
||||
client.post_status(
|
||||
args.owner, args.repo, head_sha,
|
||||
state=na_post_state, context=na_context,
|
||||
description=na_desc,
|
||||
target_url=target_url,
|
||||
)
|
||||
print(f"::notice::na-declarations status posted: {na_context} → {na_post_state}")
|
||||
# Log per-gate diagnostics.
|
||||
for gate in na_gates:
|
||||
s = na_state.get(gate, {})
|
||||
if s.get("declared"):
|
||||
print(f"::notice:: [PASS] gate={gate} — N/A declared by {','.join(s['declared'])}"
|
||||
+ (f" ({s['reason']})" if s.get("reason") else ""))
|
||||
else:
|
||||
extra = f" — rejected: {', '.join(s.get('rejected', []))}" if s.get("rejected") else ""
|
||||
print(f"::notice:: [WAIT] gate={gate} — no valid N/A declaration yet{extra}")
|
||||
|
||||
|
||||
state, description = render_status(items, ack_state, body_state)
|
||||
mode = get_tier_mode(pr, cfg)
|
||||
if mode == "soft":
|
||||
@@ -953,6 +811,7 @@ def main(argv: list[str] | None = None) -> int:
|
||||
return 0 if state in ("success", "pending") else 1
|
||||
return 0
|
||||
|
||||
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
|
||||
client.post_status(
|
||||
args.owner, args.repo, head_sha,
|
||||
state=state, context=args.status_context,
|
||||
|
||||
+27
-27
@@ -133,7 +133,6 @@ jobs:
|
||||
# the name match works on PRs that don't touch workspace-server/).
|
||||
platform-build:
|
||||
name: Platform (Go)
|
||||
needs: changes
|
||||
runs-on: ubuntu-latest
|
||||
# mc#774 (closed 2026-05-14): Phase 4 flip of the platform-build job.
|
||||
# Phase 4 (#656) originally flipped this to continue-on-error: false based on
|
||||
@@ -154,29 +153,29 @@ jobs:
|
||||
run:
|
||||
working-directory: workspace-server
|
||||
steps:
|
||||
- if: needs.changes.outputs.platform != 'true'
|
||||
- if: false
|
||||
working-directory: .
|
||||
run: echo "No platform/** changes — skipping real build steps; this job always runs to satisfy the required-check name on branch protection."
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
|
||||
with:
|
||||
go-version: 'stable'
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
run: go mod download
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
run: go build ./cmd/server
|
||||
# CLI (molecli) moved to standalone repo: git.moleculesai.app/molecule-ai/molecule-cli
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
run: go vet ./...
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
name: Install golangci-lint
|
||||
run: go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.12.2
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
name: Run golangci-lint
|
||||
run: $(go env GOPATH)/bin/golangci-lint run --timeout 3m ./...
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
name: Diagnostic — per-package verbose 60s
|
||||
run: |
|
||||
set +e
|
||||
@@ -192,7 +191,7 @@ jobs:
|
||||
echo "::endgroup::"
|
||||
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
|
||||
continue-on-error: true
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
name: Run tests with race detection and coverage
|
||||
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
|
||||
# full ./... suite with race detection + coverage. A 10m per-step timeout
|
||||
@@ -200,7 +199,7 @@ jobs:
|
||||
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
|
||||
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
|
||||
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
name: Per-file coverage report
|
||||
# Advisory — lists every source file with its coverage so reviewers
|
||||
# can see at-a-glance where gaps are. Sorted ascending so the worst
|
||||
@@ -214,7 +213,7 @@ jobs:
|
||||
END {for (f in s) printf "%6.1f%% %s\n", s[f]/c[f], f}' \
|
||||
| sort -n
|
||||
|
||||
- if: needs.changes.outputs.platform == 'true'
|
||||
- if: always()
|
||||
name: Check coverage thresholds
|
||||
# Enforces two gates from #1823 Layer 1:
|
||||
# 1. Total floor (25% — ratchet plan in COVERAGE_FLOOR.md).
|
||||
@@ -302,7 +301,6 @@ jobs:
|
||||
# siblings — verified empirically on PR #2314).
|
||||
canvas-build:
|
||||
name: Canvas (Next.js)
|
||||
needs: changes
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 20
|
||||
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
|
||||
@@ -311,20 +309,20 @@ jobs:
|
||||
run:
|
||||
working-directory: canvas
|
||||
steps:
|
||||
- if: needs.changes.outputs.canvas != 'true'
|
||||
- if: false
|
||||
working-directory: .
|
||||
run: echo "No canvas/** changes — skipping real build steps; this job always runs to satisfy the required-check name on branch protection."
|
||||
- if: needs.changes.outputs.canvas == 'true'
|
||||
- if: always()
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- if: needs.changes.outputs.canvas == 'true'
|
||||
- if: always()
|
||||
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
|
||||
with:
|
||||
node-version: '22'
|
||||
- if: needs.changes.outputs.canvas == 'true'
|
||||
- if: always()
|
||||
run: rm -f package-lock.json && npm install
|
||||
- if: needs.changes.outputs.canvas == 'true'
|
||||
- if: always()
|
||||
run: npm run build
|
||||
- if: needs.changes.outputs.canvas == 'true'
|
||||
- if: always()
|
||||
name: Run tests with coverage
|
||||
# Coverage instrumentation is configured in canvas/vitest.config.ts
|
||||
# (provider: v8, reporters: text + html + json-summary). Step 2 of
|
||||
@@ -333,7 +331,7 @@ jobs:
|
||||
# tracked in #1815) after the team sees what current coverage is.
|
||||
run: npx vitest run --coverage
|
||||
- name: Upload coverage summary as artifact
|
||||
if: needs.changes.outputs.canvas == 'true' && always()
|
||||
if: always()
|
||||
# Pinned to v3 for Gitea act_runner v0.6 compatibility — v4+ uses
|
||||
# the GHES 3.10+ artifact protocol that Gitea 1.22.x does NOT
|
||||
# implement, surfacing as `GHESNotSupportedError: @actions/artifact
|
||||
@@ -400,6 +398,8 @@ jobs:
|
||||
scripts/promote-tenant-image.sh \
|
||||
scripts/test-promote-tenant-image.sh
|
||||
|
||||
# mc#959 root-fix (sre)
|
||||
|
||||
canvas-deploy-reminder:
|
||||
name: Canvas Deploy Reminder
|
||||
runs-on: ubuntu-latest
|
||||
@@ -408,8 +408,8 @@ jobs:
|
||||
# The step-level exit 0 handles the "not main push" case; the job-level
|
||||
# `if:` makes the gating explicit so the drift script sees it.
|
||||
# continue-on-error removed (was mc#774 mask): step exits 0 when not applicable.
|
||||
if: ${{ github.ref == 'refs/heads/staging' }}
|
||||
needs: [changes, canvas-build]
|
||||
if: ${{ github.ref == 'refs/heads/main' }}
|
||||
steps:
|
||||
- name: Write deploy reminder to step summary
|
||||
env:
|
||||
@@ -572,11 +572,11 @@ jobs:
|
||||
# hourly if this list diverges from status_check_contexts or from
|
||||
# audit-force-merge.yml's REQUIRED_CHECKS env (RFC §4 + §6).
|
||||
#
|
||||
# canvas-deploy-reminder IS now included in all-required.needs (mc#958 root-fix):
|
||||
# added job-level `if: github.ref == 'refs/heads/main'` so ci-required-drift.py's
|
||||
# ci_job_names() detects it as github.ref-gated and skips it from F1.
|
||||
# The step-level `if: ... || REF_NAME != refs/heads/main` exits 0 when not main,
|
||||
# so the job succeeds (not skipped) on non-main pushes — sentinel treats as green.
|
||||
# canvas-deploy-reminder is intentionally excluded from all-required.needs:
|
||||
# it needs canvas-build, which is skipped on CI-only PRs (canvas=false).
|
||||
# Including it in all-required.needs causes all-required to hang on
|
||||
# every CI-only PR. Keep it runnable on PRs via its own
|
||||
# `needs: [changes, canvas-build]` — the sentinel only aggregates the result.
|
||||
#
|
||||
# Phase 3 (RFC #219 §1) safety: underlying build jobs carry
|
||||
# continue-on-error: true so their failures are masked to null (2026-05-12: re-enabled mc#774 interim)
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
staging trigger 2026-05-14T17:35:02Z
|
||||
staging trigger
|
||||
@@ -1 +0,0 @@
|
||||
trigger
|
||||
@@ -62,21 +62,12 @@ export function ThemeToggle({ className = "" }: { className?: string }) {
|
||||
}
|
||||
setTheme(OPTIONS[next].value);
|
||||
// Move focus to the new button so arrow-key navigation is continuous.
|
||||
// Use direct-child query to scope strictly to this radiogroup's buttons
|
||||
// and avoid accidentally focusing unrelated [role=radio] elements
|
||||
// Query is already scoped to radiogroup so no child-combinator needed;
|
||||
// avoids accidentally focusing unrelated [role=radio] elements
|
||||
// elsewhere in the DOM (e.g. React Flow canvas nodes).
|
||||
// Guard: skip focus if the current target is no longer in the document
|
||||
// (e.g. React StrictMode double-invokes handlers during re-render).
|
||||
if (!e.currentTarget.isConnected) return;
|
||||
const radiogroup = e.currentTarget.closest("[role=radiogroup]") as HTMLElement | null;
|
||||
if (!radiogroup) return;
|
||||
// Use children[] instead of querySelectorAll("> [role=radio]") to avoid
|
||||
// jsdom's child-combinator selector parsing issues in test environments.
|
||||
const btns = Array.from(radiogroup.children).filter(
|
||||
(el): el is HTMLButtonElement =>
|
||||
el.tagName === "BUTTON" && el.getAttribute("role") === "radio"
|
||||
);
|
||||
if (next < btns.length) btns[next]?.focus();
|
||||
const btns = radiogroup?.querySelectorAll<HTMLButtonElement>("[role=radio]");
|
||||
btns?.[next]?.focus();
|
||||
},
|
||||
[]
|
||||
);
|
||||
|
||||
@@ -24,12 +24,8 @@ vi.mock("@/lib/theme-provider", () => ({
|
||||
})),
|
||||
}));
|
||||
|
||||
// Wrap cleanup in act() so any pending React state updates (e.g. from
|
||||
// keyDown handlers that call setTheme) flush before DOM unmount. Without
|
||||
// this, cleanup() can race against pending renders and cause INDEX_SIZE_ERR
|
||||
// when the handleKeyDown callback tries to query the DOM mid-teardown.
|
||||
afterEach(() => {
|
||||
act(() => { cleanup(); });
|
||||
cleanup();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
@@ -150,7 +146,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
|
||||
const radios = screen.getAllByRole("radio");
|
||||
// dark (index 2) is current; ArrowRight should wrap to light (index 0)
|
||||
act(() => { radios[2].focus(); });
|
||||
act(() => { fireEvent.keyDown(radios[2], { key: "ArrowRight" }); });
|
||||
fireEvent.keyDown(radios[2], { key: "ArrowRight" });
|
||||
expect(mockSetTheme).toHaveBeenCalledWith("light");
|
||||
});
|
||||
|
||||
@@ -164,7 +160,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
|
||||
const radios = screen.getAllByRole("radio");
|
||||
// light (index 0) is current; ArrowLeft should go to dark (index 2)
|
||||
act(() => { radios[0].focus(); });
|
||||
act(() => { fireEvent.keyDown(radios[0], { key: "ArrowLeft" }); });
|
||||
fireEvent.keyDown(radios[0], { key: "ArrowLeft" });
|
||||
expect(mockSetTheme).toHaveBeenCalledWith("dark");
|
||||
});
|
||||
|
||||
@@ -178,7 +174,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
|
||||
const radios = screen.getAllByRole("radio");
|
||||
// light (index 0) is current; ArrowDown should go to system (index 1)
|
||||
act(() => { radios[0].focus(); });
|
||||
act(() => { fireEvent.keyDown(radios[0], { key: "ArrowDown" }); });
|
||||
fireEvent.keyDown(radios[0], { key: "ArrowDown" });
|
||||
expect(mockSetTheme).toHaveBeenCalledWith("system");
|
||||
});
|
||||
|
||||
@@ -191,7 +187,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
|
||||
render(<ThemeToggle />);
|
||||
const radios = screen.getAllByRole("radio");
|
||||
act(() => { radios[2].focus(); });
|
||||
act(() => { fireEvent.keyDown(radios[2], { key: "Home" }); });
|
||||
fireEvent.keyDown(radios[2], { key: "Home" });
|
||||
expect(mockSetTheme).toHaveBeenCalledWith("light");
|
||||
});
|
||||
|
||||
@@ -204,14 +200,14 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
|
||||
render(<ThemeToggle />);
|
||||
const radios = screen.getAllByRole("radio");
|
||||
act(() => { radios[0].focus(); });
|
||||
act(() => { fireEvent.keyDown(radios[0], { key: "End" }); });
|
||||
fireEvent.keyDown(radios[0], { key: "End" });
|
||||
expect(mockSetTheme).toHaveBeenCalledWith("dark");
|
||||
});
|
||||
|
||||
it("does nothing on unrelated keys", () => {
|
||||
render(<ThemeToggle />);
|
||||
const radios = screen.getAllByRole("radio");
|
||||
act(() => { fireEvent.keyDown(radios[0], { key: "Enter" }); });
|
||||
fireEvent.keyDown(radios[0], { key: "Enter" });
|
||||
expect(mockSetTheme).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -36,6 +36,20 @@ interface A2AResponseShape {
|
||||
error?: { message?: string };
|
||||
}
|
||||
|
||||
// Wire shape for GET /workspaces/:id/chat-history (chat_history.go → ChatHistoryResponse).
|
||||
interface ApiChatMessage {
|
||||
id: string;
|
||||
role: string; // "user" | "agent" | "system"
|
||||
content: string;
|
||||
timestamp: string;
|
||||
attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }>;
|
||||
}
|
||||
|
||||
interface ChatHistoryResponse {
|
||||
messages: ApiChatMessage[];
|
||||
reached_end: boolean;
|
||||
}
|
||||
|
||||
const formatTime = (date: Date) =>
|
||||
date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" });
|
||||
|
||||
@@ -61,18 +75,14 @@ export function MobileChat({
|
||||
// that creates a new [] reference on every store update when the key is
|
||||
// absent, causing infinite re-render (React error #185).
|
||||
const storedMessages = useCanvasStore((s) => s.agentMessages[agentId]);
|
||||
const [messages, setMessages] = useState<ChatMessage[]>(() =>
|
||||
(storedMessages ?? []).map((m) => ({
|
||||
id: m.id,
|
||||
role: "agent",
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
})),
|
||||
);
|
||||
// Start empty — history is loaded via useEffect below.
|
||||
const [messages, setMessages] = useState<ChatMessage[]>([]);
|
||||
const [draft, setDraft] = useState("");
|
||||
const [tab, setTab] = useState<SubTab>("my");
|
||||
const [sending, setSending] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [loading, setLoading] = useState(true); // history is loading on mount
|
||||
const [historyError, setHistoryError] = useState<string | null>(null);
|
||||
const scrollRef = useRef<HTMLDivElement>(null);
|
||||
// Synchronous re-entry guard. `setSending(true)` schedules a state
|
||||
// update but doesn't flush before a second tap can fire send() — a ref
|
||||
@@ -80,6 +90,9 @@ export function MobileChat({
|
||||
// double-send race a stale `sending` lets through.
|
||||
const sendInFlightRef = useRef(false);
|
||||
const composerRef = useRef<HTMLTextAreaElement>(null);
|
||||
// Guard: don't treat the initial store population as a live push.
|
||||
// Set to false after the first render completes.
|
||||
const initDoneRef = useRef(false);
|
||||
|
||||
// Auto-grow the textarea: reset height to 'auto' so the scrollHeight
|
||||
// shrinks when the user deletes text, then size to scrollHeight up to
|
||||
@@ -92,6 +105,75 @@ export function MobileChat({
|
||||
el.style.height = `${next}px`;
|
||||
}, [draft]);
|
||||
|
||||
// Fetch chat history on mount; keep merging live agentMessages while the
|
||||
// panel is open. InitDoneRef prevents the initial store snapshot from
|
||||
// triggering the live-merge path (the store buffer is populated by
|
||||
// ChatTab on desktop, not on mobile — this effect loads history as the
|
||||
// mobile-native path).
|
||||
useEffect(() => {
|
||||
let cancelled = false;
|
||||
|
||||
const mapApiMessage = (m: ApiChatMessage): ChatMessage => ({
|
||||
id: m.id,
|
||||
role: m.role === "user" ? "user" : "agent",
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
});
|
||||
|
||||
const syncLive = () => {
|
||||
const live = useCanvasStore.getState().agentMessages[agentId] ?? [];
|
||||
if (live.length > 0) {
|
||||
setMessages((prev) => {
|
||||
const existingIds = new Set(prev.map((m) => m.id));
|
||||
const newOnes = live
|
||||
.filter((m) => !existingIds.has(m.id))
|
||||
.map((m) => ({
|
||||
id: m.id,
|
||||
role: "agent" as const,
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
}));
|
||||
return newOnes.length > 0 ? [...prev, ...newOnes] : prev;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const bootstrap = async (): Promise<(() => void) | undefined> => {
|
||||
setLoading(true);
|
||||
setHistoryError(null);
|
||||
try {
|
||||
const res = await api.get<ChatHistoryResponse>(
|
||||
`/workspaces/${agentId}/chat-history?limit=50`,
|
||||
);
|
||||
if (cancelled) return;
|
||||
const initial = (res.messages ?? []).map(mapApiMessage);
|
||||
setMessages(initial);
|
||||
// Mark init done BEFORE marking loading=false so any store push
|
||||
// that arrives in the same tick is treated as live, not init.
|
||||
initDoneRef.current = true;
|
||||
setLoading(false);
|
||||
// Subscribe to live pushes after init is complete.
|
||||
syncLive();
|
||||
const unsubscribe = useCanvasStore.subscribe(syncLive);
|
||||
return unsubscribe; // returned for cleanup
|
||||
} catch (e) {
|
||||
if (cancelled) return;
|
||||
setHistoryError(e instanceof Error ? e.message : "Failed to load chat history");
|
||||
setLoading(false);
|
||||
initDoneRef.current = true;
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
let maybeUnsubscribe: (() => void) | undefined;
|
||||
bootstrap().then((fn) => { maybeUnsubscribe = fn; });
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (maybeUnsubscribe) maybeUnsubscribe();
|
||||
};
|
||||
}, [agentId]);
|
||||
|
||||
useEffect(() => {
|
||||
if (scrollRef.current) {
|
||||
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
|
||||
@@ -311,7 +393,61 @@ export function MobileChat({
|
||||
Agent Comms — peer-to-peer A2A traffic surfaces in the Comms tab.
|
||||
</div>
|
||||
)}
|
||||
{tab === "my" && messages.length === 0 && (
|
||||
{tab === "my" && loading && (
|
||||
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
|
||||
<div style={{ marginBottom: 6, opacity: 0.6, animation: "spin 1s linear infinite", display: "inline-block", fontSize: 16 }}>⟳</div>
|
||||
<div>Loading chat history…</div>
|
||||
</div>
|
||||
)}
|
||||
{tab === "my" && !loading && historyError && (
|
||||
<div
|
||||
role="alert"
|
||||
style={{
|
||||
padding: "14px 4px",
|
||||
textAlign: "center",
|
||||
color: p.failed,
|
||||
fontSize: 13,
|
||||
}}
|
||||
>
|
||||
<div style={{ marginBottom: 8 }}>Could not load chat history.</div>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => {
|
||||
setLoading(true);
|
||||
setHistoryError(null);
|
||||
api.get(`/workspaces/${agentId}/chat-history?limit=50`).then(
|
||||
(res: unknown) => {
|
||||
const r = res as ChatHistoryResponse;
|
||||
setMessages((r.messages ?? []).map((m) => ({
|
||||
id: m.id,
|
||||
role: m.role === "user" ? "user" : "agent",
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
})));
|
||||
setLoading(false);
|
||||
initDoneRef.current = true;
|
||||
},
|
||||
).catch((e: unknown) => {
|
||||
setHistoryError(e instanceof Error ? e.message : "Failed to load");
|
||||
setLoading(false);
|
||||
initDoneRef.current = true;
|
||||
});
|
||||
}}
|
||||
style={{
|
||||
padding: "6px 14px",
|
||||
borderRadius: 14,
|
||||
border: `0.5px solid ${p.failed}`,
|
||||
background: "transparent",
|
||||
color: p.failed,
|
||||
fontSize: 12,
|
||||
cursor: "pointer",
|
||||
}}
|
||||
>
|
||||
Retry
|
||||
</button>
|
||||
</div>
|
||||
)}
|
||||
{tab === "my" && !loading && !historyError && messages.length === 0 && (
|
||||
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
|
||||
Send a message to start chatting.
|
||||
</div>
|
||||
|
||||
@@ -8,11 +8,19 @@
|
||||
* NOTE: No @testing-library/jest-dom — use DOM APIs.
|
||||
*/
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { cleanup, render } from "@testing-library/react";
|
||||
import { act, cleanup, render, waitFor } from "@testing-library/react";
|
||||
import React from "react";
|
||||
|
||||
import { MobileChat } from "../MobileChat";
|
||||
|
||||
// ─── Mock API ─────────────────────────────────────────────────────────────────
|
||||
// vi.mock without a factory auto-mocks the module. In tests, we configure
|
||||
// api.get / api.post directly (they are vi.fn() from the auto-mock).
|
||||
// Tests that need specific behaviour use mockResolvedValueOnce on the
|
||||
// auto-mocked functions.
|
||||
vi.mock("@/lib/api");
|
||||
import { api } from "@/lib/api";
|
||||
|
||||
// ─── Mock store ───────────────────────────────────────────────────────────────
|
||||
|
||||
const mockAgentId = "ws-chat-test";
|
||||
@@ -32,8 +40,14 @@ const mockStoreState = {
|
||||
|
||||
vi.mock("@/store/canvas", () => ({
|
||||
useCanvasStore: Object.assign(
|
||||
vi.fn((sel) => sel(mockStoreState)),
|
||||
{ getState: () => mockStoreState },
|
||||
vi.fn((sel?: (state: typeof mockStoreState) => unknown) => {
|
||||
if (sel) return sel(mockStoreState);
|
||||
return mockStoreState;
|
||||
}),
|
||||
{
|
||||
getState: () => mockStoreState,
|
||||
subscribe: vi.fn(() => vi.fn()),
|
||||
},
|
||||
),
|
||||
summarizeWorkspaceCapabilities: vi.fn((data: Record<string, unknown>) => {
|
||||
const agentCard = data.agentCard as Record<string, unknown> | null;
|
||||
@@ -54,16 +68,6 @@ vi.mock("@/store/canvas", () => ({
|
||||
}),
|
||||
}));
|
||||
|
||||
// ─── Mock API ─────────────────────────────────────────────────────────────────
|
||||
|
||||
const { mockApiPost } = vi.hoisted(() => ({
|
||||
mockApiPost: vi.fn().mockResolvedValue({ result: { parts: [] } }),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/api", () => ({
|
||||
api: { post: mockApiPost },
|
||||
}));
|
||||
|
||||
// ─── Fixtures ────────────────────────────────────────────────────────────────
|
||||
|
||||
const onlineNode = {
|
||||
@@ -150,7 +154,15 @@ beforeEach(() => {
|
||||
mockOnBack.mockClear();
|
||||
mockStoreState.nodes = [];
|
||||
mockStoreState.agentMessages = {};
|
||||
mockApiPost.mockClear();
|
||||
// Set up spies on the real api methods. Tests override these per-call.
|
||||
const getSpy = vi.spyOn(api, "get");
|
||||
const postSpy = vi.spyOn(api, "post");
|
||||
getSpy.mockResolvedValue({ messages: [], reached_end: true });
|
||||
postSpy.mockResolvedValue({ result: { parts: [] } });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -266,15 +278,26 @@ describe("MobileChat — empty state", () => {
|
||||
mockStoreState.nodes = [onlineNode];
|
||||
});
|
||||
|
||||
it('shows "Send a message to start chatting." when no messages', () => {
|
||||
const { container } = renderChat(mockAgentId);
|
||||
it('shows "Send a message to start chatting." when no messages', async () => {
|
||||
// History fetch resolves immediately in tests (mockResolvedValue).
|
||||
// act() flushes the microtask queue so the component reaches its
|
||||
// post-load state before we assert.
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
const { container } = renderResult!;
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
|
||||
});
|
||||
|
||||
it("shows no messages when agentMessages[agentId] is absent (undefined)", () => {
|
||||
it("shows no messages when agentMessages[agentId] is absent (undefined)", async () => {
|
||||
// Explicitly set to empty to simulate no stored messages
|
||||
mockStoreState.agentMessages = {};
|
||||
const { container } = renderChat(mockAgentId);
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
const { container } = renderResult!;
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
|
||||
});
|
||||
});
|
||||
@@ -321,3 +344,132 @@ describe("MobileChat — dark mode", () => {
|
||||
expect(container.querySelector('[aria-label="Back"]')).toBeTruthy();
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Chat history loading ────────────────────────────────────────────────────
|
||||
|
||||
describe("MobileChat — chat history", () => {
|
||||
beforeEach(() => {
|
||||
mockStoreState.nodes = [onlineNode];
|
||||
});
|
||||
|
||||
it("calls GET /workspaces/:id/chat-history on mount", async () => {
|
||||
await act(async () => {
|
||||
renderChat(mockAgentId);
|
||||
});
|
||||
expect(api.get).toHaveBeenCalledWith(
|
||||
`/workspaces/${mockAgentId}/chat-history?limit=50`,
|
||||
);
|
||||
});
|
||||
|
||||
it("shows loading state while history is fetching", () => {
|
||||
// Do NOT await — check the pre-resolve state.
|
||||
const { container } = renderChat(mockAgentId);
|
||||
expect(container.textContent ?? "").toContain("Loading chat history…");
|
||||
});
|
||||
|
||||
it("shows empty state after history resolves with no messages", async () => {
|
||||
// beforeEach already sets api.get to resolve with empty — no override needed.
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
const { container } = renderResult!;
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
|
||||
});
|
||||
|
||||
it("renders messages from history response", async () => {
|
||||
vi.spyOn(api, "get").mockResolvedValueOnce({
|
||||
messages: [
|
||||
{
|
||||
id: "msg-1",
|
||||
role: "user",
|
||||
content: "Hello agent",
|
||||
timestamp: "2026-04-25T10:00:00Z",
|
||||
},
|
||||
{
|
||||
id: "msg-2",
|
||||
role: "agent",
|
||||
content: "Hello back",
|
||||
timestamp: "2026-04-25T10:00:01Z",
|
||||
},
|
||||
],
|
||||
reached_end: true,
|
||||
});
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
const { container } = renderResult!;
|
||||
expect(container.textContent ?? "").toContain("Hello agent");
|
||||
expect(container.textContent ?? "").toContain("Hello back");
|
||||
});
|
||||
|
||||
it("maps user role from API correctly", async () => {
|
||||
vi.spyOn(api, "get").mockResolvedValueOnce({
|
||||
messages: [
|
||||
{
|
||||
id: "msg-u",
|
||||
role: "user",
|
||||
content: "user message",
|
||||
timestamp: "2026-04-25T10:00:00Z",
|
||||
},
|
||||
],
|
||||
reached_end: true,
|
||||
});
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
// User messages render right-aligned. The text content check is sufficient
|
||||
// to confirm the message appeared.
|
||||
const { container } = renderResult!;
|
||||
expect(container.textContent ?? "").toContain("user message");
|
||||
});
|
||||
|
||||
it("shows error state when history fetch fails", async () => {
|
||||
vi.spyOn(api, "get").mockRejectedValue(new Error("Network error"));
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
const { container } = renderResult!;
|
||||
expect(container.textContent ?? "").toContain("Could not load chat history.");
|
||||
expect(container.textContent ?? "").toContain("Retry");
|
||||
});
|
||||
|
||||
it("Retry button re-fetches history after error", async () => {
|
||||
// Make the initial mount call fail so the Retry button appears, then
|
||||
// make the retry call succeed so we can verify the full flow.
|
||||
const getSpy = vi.spyOn(api, "get");
|
||||
getSpy
|
||||
.mockRejectedValueOnce(new Error("Network error"))
|
||||
.mockResolvedValueOnce({ messages: [], reached_end: true });
|
||||
|
||||
let renderResult: ReturnType<typeof renderChat>;
|
||||
await act(async () => {
|
||||
renderResult = renderChat(mockAgentId);
|
||||
});
|
||||
const { container } = renderResult!;
|
||||
|
||||
// Error state should be shown with Retry button.
|
||||
expect(container.textContent ?? "").toContain("Could not load chat history.");
|
||||
expect(container.textContent ?? "").toContain("Retry");
|
||||
|
||||
// Click Retry — the button's onClick fires api.get again.
|
||||
// The second mockResolvedValueOnce makes it succeed.
|
||||
const retryBtn = Array.from(container.querySelectorAll("button")).find(
|
||||
(b) => b.textContent?.trim() === "Retry",
|
||||
);
|
||||
expect(retryBtn).toBeTruthy();
|
||||
await act(async () => {
|
||||
retryBtn?.click();
|
||||
});
|
||||
|
||||
// waitFor polls until the retry resolves and component re-renders.
|
||||
await waitFor(() => {
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
|
||||
});
|
||||
// Initial call + retry = 2.
|
||||
expect(getSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -962,6 +962,32 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
{/* talk_to_user disabled banner — shown when the workspace has
|
||||
talk_to_user_enabled=false. The agent cannot send canvas messages;
|
||||
the user can re-enable the ability from here without opening settings. */}
|
||||
{data.talkToUserEnabled === false && (
|
||||
<div className="flex items-center gap-2 px-3 py-2 bg-surface-sunken border-b border-line/40 shrink-0">
|
||||
<svg width="14" height="14" viewBox="0 0 16 16" fill="none" aria-hidden="true" className="shrink-0 text-ink-mid">
|
||||
<path d="M8 1a7 7 0 1 0 0 14A7 7 0 0 0 8 1Zm0 10.5a.75.75 0 1 1 0-1.5.75.75 0 0 1 0 1.5ZM8 4a.75.75 0 0 1 .75.75v4a.75.75 0 0 1-1.5 0v-4A.75.75 0 0 1 8 4Z" fill="currentColor"/>
|
||||
</svg>
|
||||
<span className="text-[10px] text-ink-mid flex-1">
|
||||
Agent is not enabled to chat with you.
|
||||
</span>
|
||||
<button
|
||||
onClick={async () => {
|
||||
try {
|
||||
await api.patch(`/workspaces/${workspaceId}/abilities`, { talk_to_user_enabled: true });
|
||||
useCanvasStore.getState().updateNodeData(workspaceId, { talkToUserEnabled: true });
|
||||
} catch {
|
||||
// ignore — user will see no change and can retry
|
||||
}
|
||||
}}
|
||||
className="px-2 py-0.5 text-[10px] font-medium bg-accent/10 hover:bg-accent/20 text-accent rounded border border-accent/30 transition-colors shrink-0"
|
||||
>
|
||||
Enable
|
||||
</button>
|
||||
</div>
|
||||
)}
|
||||
{/* Messages */}
|
||||
<div ref={containerRef} className="flex-1 overflow-y-auto p-3 space-y-3">
|
||||
{loading && (
|
||||
|
||||
@@ -519,6 +519,10 @@ export function buildNodesAndEdges(
|
||||
// #2054 — server-declared per-workspace provisioning timeout.
|
||||
// Falls through to the runtime profile when null/absent.
|
||||
provisionTimeoutMs: ws.provision_timeout_ms ?? null,
|
||||
// Workspace abilities — defaults preserved for old platform versions
|
||||
// that don't yet include these columns in the GET response.
|
||||
broadcastEnabled: ws.broadcast_enabled ?? false,
|
||||
talkToUserEnabled: ws.talk_to_user_enabled ?? true,
|
||||
},
|
||||
};
|
||||
if (hasParent) {
|
||||
|
||||
@@ -99,6 +99,13 @@ export interface WorkspaceNodeData extends Record<string, unknown> {
|
||||
* @/lib/runtimeProfiles. Lets a slow runtime declare its cold-boot
|
||||
* expectation without a canvas release. */
|
||||
provisionTimeoutMs?: number | null;
|
||||
/** When true the workspace may POST /broadcast to send org-wide messages.
|
||||
* Default false. Toggled by user/admin via PATCH /workspaces/:id/abilities. */
|
||||
broadcastEnabled?: boolean;
|
||||
/** When false the workspace cannot deliver canvas chat messages.
|
||||
* send_message_to_user / POST /notify return 403 and the canvas
|
||||
* shows a "not enabled" state with a button to re-enable. Default true. */
|
||||
talkToUserEnabled?: boolean;
|
||||
}
|
||||
|
||||
export type PanelTab = "details" | "skills" | "chat" | "terminal" | "config" | "schedule" | "channels" | "files" | "memory" | "traces" | "events" | "activity" | "audit";
|
||||
|
||||
@@ -299,6 +299,9 @@ export interface WorkspaceData {
|
||||
* `@/lib/runtimeProfiles` when absent (the default behavior for any
|
||||
* template that hasn't yet declared the field). */
|
||||
provision_timeout_ms?: number | null;
|
||||
/** Workspace ability flags (migration 20260514). */
|
||||
broadcast_enabled?: boolean;
|
||||
talk_to_user_enabled?: boolean;
|
||||
}
|
||||
|
||||
let socket: ReconnectingSocket | null = null;
|
||||
|
||||
Executable
+296
@@ -0,0 +1,296 @@
|
||||
#!/usr/bin/env bash
|
||||
# E2E test: workspace broadcast and talk-to-user platform abilities.
|
||||
#
|
||||
# What this proves:
|
||||
# 1. talk_to_user_enabled (default true) — POST /notify works out-of-the-box.
|
||||
# 2. PATCH /workspaces/:id/abilities { talk_to_user_enabled: false } disables
|
||||
# delivery: /notify → 403 with error="talk_to_user_disabled" + delegate hint.
|
||||
# 3. Re-enabling talk_to_user_enabled restores delivery.
|
||||
# 4. broadcast_enabled (default false) — POST /broadcast → 403 when disabled.
|
||||
# 5. PATCH { broadcast_enabled: true } enables fan-out.
|
||||
# 6. POST /broadcast delivers to all non-sender, non-removed workspaces:
|
||||
# - Returns {"status":"sent","delivered":N}
|
||||
# - Receiver's activity log has a broadcast_receive entry with the message.
|
||||
# - Sender's activity log has a broadcast_sent entry.
|
||||
# 7. The sender itself does NOT receive a broadcast_receive entry.
|
||||
#
|
||||
# Usage: tests/e2e/test_workspace_abilities_e2e.sh
|
||||
# Prereqs: workspace-server on http://localhost:8080, MOLECULE_ENV != production
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
source "$(dirname "$0")/_lib.sh"
|
||||
|
||||
PASS=0
|
||||
FAIL=0
|
||||
SENDER_ID=""
|
||||
RECEIVER_ID=""
|
||||
|
||||
cleanup() {
|
||||
for wid in "$SENDER_ID" "$RECEIVER_ID"; do
|
||||
if [ -n "$wid" ]; then
|
||||
curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" > /dev/null || true
|
||||
fi
|
||||
done
|
||||
}
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
assert() {
|
||||
local label="$1" actual="$2" expected="$3"
|
||||
if [ "$actual" = "$expected" ]; then
|
||||
echo " PASS — $label"
|
||||
PASS=$((PASS+1))
|
||||
else
|
||||
echo " FAIL — $label"
|
||||
echo " expected: $expected"
|
||||
echo " actual: $actual"
|
||||
FAIL=$((FAIL+1))
|
||||
fi
|
||||
}
|
||||
|
||||
assert_contains() {
|
||||
local label="$1" haystack="$2" needle="$3"
|
||||
if echo "$haystack" | grep -qF "$needle"; then
|
||||
echo " PASS — $label"
|
||||
PASS=$((PASS+1))
|
||||
else
|
||||
echo " FAIL — $label"
|
||||
echo " needle: $needle"
|
||||
echo " haystack: $haystack"
|
||||
FAIL=$((FAIL+1))
|
||||
fi
|
||||
}
|
||||
|
||||
assert_not_contains() {
|
||||
local label="$1" haystack="$2" needle="$3"
|
||||
if ! echo "$haystack" | grep -qF "$needle"; then
|
||||
echo " PASS — $label"
|
||||
PASS=$((PASS+1))
|
||||
else
|
||||
echo " FAIL — $label (unexpected match)"
|
||||
echo " needle: $needle"
|
||||
echo " haystack: $haystack"
|
||||
FAIL=$((FAIL+1))
|
||||
fi
|
||||
}
|
||||
|
||||
# ── Pre-sweep: remove any stale leftover workspaces from a prior aborted run ──
|
||||
echo "=== Setup ==="
|
||||
for NAME in "Abilities Sender" "Abilities Receiver"; do
|
||||
PRIOR=$(curl -s "$BASE/workspaces" | python3 -c "
|
||||
import json, sys
|
||||
try:
|
||||
print(' '.join(w['id'] for w in json.load(sys.stdin) if w.get('name') == '$NAME'))
|
||||
except Exception:
|
||||
pass
|
||||
")
|
||||
for _wid in $PRIOR; do
|
||||
echo "Sweeping leftover '$NAME' workspace: $_wid"
|
||||
curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" > /dev/null || true
|
||||
done
|
||||
done
|
||||
|
||||
R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
|
||||
-d '{"name":"Abilities Sender","tier":1}')
|
||||
SENDER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
|
||||
[ -n "$SENDER_ID" ] || { echo "Failed to create sender workspace: $R"; exit 1; }
|
||||
echo "Created sender workspace: $SENDER_ID"
|
||||
|
||||
R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
|
||||
-d '{"name":"Abilities Receiver","tier":1}')
|
||||
RECEIVER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
|
||||
[ -n "$RECEIVER_ID" ] || { echo "Failed to create receiver workspace: $R"; exit 1; }
|
||||
echo "Created receiver workspace: $RECEIVER_ID"
|
||||
|
||||
# Mint workspace-scoped bearer tokens (test-only endpoint, disabled in prod).
|
||||
SENDER_TOKEN=$(e2e_mint_test_token "$SENDER_ID")
|
||||
[ -n "$SENDER_TOKEN" ] || { echo "Failed to mint sender token"; exit 1; }
|
||||
SENDER_AUTH="Authorization: Bearer $SENDER_TOKEN"
|
||||
|
||||
# Admin token — any live workspace bearer satisfies AdminAuth in local dev.
|
||||
# In production-like envs, set MOLECULE_ADMIN_TOKEN.
|
||||
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:-$SENDER_TOKEN}"
|
||||
ADMIN_AUTH="Authorization: Bearer $ADMIN_TOKEN"
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
echo ""
|
||||
echo "=== Part 1: talk_to_user ability ==="
|
||||
|
||||
echo ""
|
||||
echo "--- 1a: /notify works with default talk_to_user_enabled=true ---"
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":"Hello from sender"}')
|
||||
assert "POST /notify returns 200 when talk_to_user_enabled=true (default)" "$CODE" "200"
|
||||
|
||||
echo ""
|
||||
echo "--- 1b: Disable talk_to_user ---"
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
|
||||
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
|
||||
-d '{"talk_to_user_enabled": false}')
|
||||
assert "PATCH /abilities talk_to_user_enabled=false returns 200" "$CODE" "200"
|
||||
|
||||
# Verify the flag is reflected in the workspace GET response.
|
||||
WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
|
||||
FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
|
||||
assert "GET /workspaces/:id reflects talk_to_user_enabled=false" "$FLAG" "False"
|
||||
|
||||
echo ""
|
||||
echo "--- 1c: /notify blocked when talk_to_user disabled ---"
|
||||
BODY=$(curl -s -w "" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":"Should be blocked"}')
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":"Should be blocked"}')
|
||||
assert "POST /notify returns 403 when talk_to_user_enabled=false" "$CODE" "403"
|
||||
|
||||
ERR=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("error",""))' 2>/dev/null || echo "")
|
||||
assert_contains "403 body contains talk_to_user_disabled error code" "$ERR" "talk_to_user_disabled"
|
||||
|
||||
HINT=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("hint",""))' 2>/dev/null || echo "")
|
||||
assert_contains "403 body contains delegate_task hint" "$HINT" "delegate_task"
|
||||
|
||||
echo ""
|
||||
echo "--- 1d: Re-enable talk_to_user and verify /notify works again ---"
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
|
||||
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
|
||||
-d '{"talk_to_user_enabled": true}')
|
||||
assert "PATCH /abilities talk_to_user_enabled=true returns 200" "$CODE" "200"
|
||||
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":"Re-enabled, should work"}')
|
||||
assert "POST /notify returns 200 after re-enabling talk_to_user" "$CODE" "200"
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
echo ""
|
||||
echo "=== Part 2: broadcast ability ==="
|
||||
|
||||
echo ""
|
||||
echo "--- 2a: Broadcast blocked by default (broadcast_enabled=false) ---"
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":"Should be blocked"}')
|
||||
assert "POST /broadcast returns 403 when broadcast_enabled=false (default)" "$CODE" "403"
|
||||
|
||||
echo ""
|
||||
echo "--- 2b: Enable broadcast ---"
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
|
||||
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
|
||||
-d '{"broadcast_enabled": true}')
|
||||
assert "PATCH /abilities broadcast_enabled=true returns 200" "$CODE" "200"
|
||||
|
||||
WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
|
||||
FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
|
||||
assert "GET /workspaces/:id reflects broadcast_enabled=true" "$FLAG" "True"
|
||||
|
||||
echo ""
|
||||
echo "--- 2c: Successful broadcast fan-out ---"
|
||||
BCAST=$(curl -s -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":"Org-wide notice: scheduled maintenance in 5 minutes."}')
|
||||
BSTATUS=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("status",""))' 2>/dev/null || echo "")
|
||||
BDELIVERED=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("delivered","-1"))' 2>/dev/null || echo "-1")
|
||||
assert "POST /broadcast returns status=sent" "$BSTATUS" "sent"
|
||||
|
||||
# delivered count must be >= 1 (the receiver workspace).
|
||||
echo " INFO — broadcast delivered=$BDELIVERED"
|
||||
if python3 -c "import sys; sys.exit(0 if int('$BDELIVERED') >= 1 else 1)" 2>/dev/null; then
|
||||
echo " PASS — delivered count >= 1"
|
||||
PASS=$((PASS+1))
|
||||
else
|
||||
echo " FAIL — expected delivered >= 1, got $BDELIVERED"
|
||||
FAIL=$((FAIL+1))
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "--- 2d: Receiver activity log has broadcast_receive entry ---"
|
||||
RECEIVER_TOKEN=$(e2e_mint_test_token "$RECEIVER_ID")
|
||||
[ -n "$RECEIVER_TOKEN" ] || { echo "Failed to mint receiver token"; exit 1; }
|
||||
RECEIVER_AUTH="Authorization: Bearer $RECEIVER_TOKEN"
|
||||
|
||||
ACT=$(curl -s -H "$RECEIVER_AUTH" "$BASE/workspaces/$RECEIVER_ID/activity?source=agent&limit=20")
|
||||
ROW=$(echo "$ACT" | python3 -c '
|
||||
import json, sys
|
||||
rows = json.load(sys.stdin) or []
|
||||
for r in rows:
|
||||
if r.get("activity_type") == "broadcast_receive":
|
||||
print(json.dumps(r))
|
||||
break
|
||||
')
|
||||
[ -n "$ROW" ] || {
|
||||
echo " FAIL — could not find broadcast_receive row in receiver activity"
|
||||
FAIL=$((FAIL+1))
|
||||
}
|
||||
|
||||
if [ -n "$ROW" ]; then
|
||||
# Message is stored in summary field.
|
||||
MSG=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("summary",""))')
|
||||
assert_contains "broadcast_receive row summary has original message" "$MSG" "scheduled maintenance"
|
||||
# Sender ID is stored in source_id field.
|
||||
SRC=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("source_id",""))')
|
||||
assert "broadcast_receive row source_id is sender workspace" "$SRC" "$SENDER_ID"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "--- 2e: Sender activity log has broadcast_sent entry ---"
|
||||
ACT_SENDER=$(curl -s -H "$SENDER_AUTH" "$BASE/workspaces/$SENDER_ID/activity?limit=20")
|
||||
SENT_ROW=$(echo "$ACT_SENDER" | python3 -c '
|
||||
import json, sys
|
||||
rows = json.load(sys.stdin) or []
|
||||
for r in rows:
|
||||
if r.get("activity_type") == "broadcast_sent":
|
||||
print(json.dumps(r))
|
||||
break
|
||||
')
|
||||
[ -n "$SENT_ROW" ] || {
|
||||
echo " FAIL — could not find broadcast_sent row in sender activity"
|
||||
FAIL=$((FAIL+1))
|
||||
}
|
||||
|
||||
if [ -n "$SENT_ROW" ]; then
|
||||
# Delivered count is baked into the summary field (no response_body for sender row).
|
||||
SUMMARY=$(echo "$SENT_ROW" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("summary",""))')
|
||||
assert_contains "broadcast_sent summary mentions workspace count" "$SUMMARY" "workspace"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "--- 2f: Sender does NOT receive a broadcast_receive entry ---"
|
||||
SELF_RECV=$(echo "$ACT_SENDER" | python3 -c '
|
||||
import json, sys
|
||||
rows = json.load(sys.stdin) or []
|
||||
for r in rows:
|
||||
if r.get("activity_type") == "broadcast_receive":
|
||||
print("found")
|
||||
break
|
||||
')
|
||||
assert_not_contains "sender has no broadcast_receive in own activity log" "${SELF_RECV:-}" "found"
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
echo ""
|
||||
echo "--- 2g: Empty message is rejected ---"
|
||||
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
|
||||
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
|
||||
-d '{"message":""}')
|
||||
assert "POST /broadcast with empty message returns 400" "$CODE" "400"
|
||||
|
||||
echo ""
|
||||
echo "--- 2h: Partial PATCH does not clobber other flags ---"
|
||||
# Set talk_to_user=false, then patch only broadcast — talk_to_user must stay false.
|
||||
curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
|
||||
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
|
||||
-d '{"talk_to_user_enabled": false}'
|
||||
curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
|
||||
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
|
||||
-d '{"broadcast_enabled": false}'
|
||||
WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
|
||||
TUF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
|
||||
BEF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
|
||||
assert "partial PATCH preserves talk_to_user_enabled=false" "$TUF" "False"
|
||||
assert "partial PATCH sets broadcast_enabled=false" "$BEF" "False"
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
echo ""
|
||||
echo "=== Results: $PASS passed, $FAIL failed ==="
|
||||
[ "$FAIL" -eq 0 ]
|
||||
@@ -97,28 +97,28 @@ const maxProxyResponseBody = 10 << 20
|
||||
//
|
||||
// Timeout model — three independent budgets, none of which gets in each other's way:
|
||||
//
|
||||
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
|
||||
// the entire request including streamed body reads, and would pre-empt
|
||||
// legitimate slow cold-start flows (Claude Code first-token over OAuth
|
||||
// can take 30-60s on boot; long-running agent synthesis can stream
|
||||
// tokens for minutes). Total-request budget is enforced per-request
|
||||
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
|
||||
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
|
||||
// the entire request including streamed body reads, and would pre-empt
|
||||
// legitimate slow cold-start flows (Claude Code first-token over OAuth
|
||||
// can take 30-60s on boot; long-running agent synthesis can stream
|
||||
// tokens for minutes). Total-request budget is enforced per-request
|
||||
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
|
||||
//
|
||||
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
|
||||
// black-holes TCP connects (instance terminated mid-flight, security group
|
||||
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
|
||||
// enough that Cloudflare's ~100s edge timeout can fire first and surface
|
||||
// a generic 502 page to canvas. 10s is well above realistic intra-region
|
||||
// latencies and well below CF's edge timeout.
|
||||
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
|
||||
// black-holes TCP connects (instance terminated mid-flight, security group
|
||||
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
|
||||
// enough that Cloudflare's ~100s edge timeout can fire first and surface
|
||||
// a generic 502 page to canvas. 10s is well above realistic intra-region
|
||||
// latencies and well below CF's edge timeout.
|
||||
//
|
||||
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
|
||||
// to response-headers-start. Configurable via
|
||||
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
|
||||
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
|
||||
// turns (big context + internal delegate_task round-trips routinely exceed
|
||||
// the old 60s ceiling). Body streaming after headers is governed by the
|
||||
// per-request context deadline, NOT this timeout — so multi-minute agent
|
||||
// responses still work fine.
|
||||
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
|
||||
// to response-headers-start. Configurable via
|
||||
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
|
||||
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
|
||||
// turns (big context + internal delegate_task round-trips routinely exceed
|
||||
// the old 60s ceiling). Body streaming after headers is governed by the
|
||||
// per-request context deadline, NOT this timeout — so multi-minute agent
|
||||
// responses still work fine.
|
||||
//
|
||||
// The point of (2) and (3) is to surface a *structured* 503 from
|
||||
// handleA2ADispatchError when the workspace agent is unreachable, so canvas
|
||||
|
||||
@@ -194,7 +194,7 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
|
||||
h.goAsync(func() { h.RestartByID(workspaceID) })
|
||||
go h.RestartByID(workspaceID)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -241,7 +241,7 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
|
||||
h.goAsync(func() { h.RestartByID(workspaceID) })
|
||||
go h.RestartByID(workspaceID)
|
||||
return &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{
|
||||
@@ -262,8 +262,8 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
|
||||
errWsName = workspaceID
|
||||
}
|
||||
summary := "A2A request to " + errWsName + " failed: " + errMsg
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
@@ -277,7 +277,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
|
||||
Status: "error",
|
||||
ErrorDetail: &errMsg,
|
||||
})
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
@@ -298,19 +298,19 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
// silent workspaces. Only update when callerID is a real workspace (not
|
||||
// canvas, not a system caller) and the target returned 2xx/3xx.
|
||||
if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 {
|
||||
h.goAsync(func() {
|
||||
go func() {
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := db.DB.ExecContext(bgCtx,
|
||||
`UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil {
|
||||
log.Printf("last_outbound_at update failed for %s: %v", callerID, err)
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
toolTrace := extractToolTrace(respBody)
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
@@ -325,7 +325,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
})
|
||||
}(ctx)
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
|
||||
@@ -510,8 +510,8 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (queued for poll)"
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
@@ -523,7 +523,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
|
||||
RequestBody: json.RawMessage(body),
|
||||
Status: "ok",
|
||||
})
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
|
||||
|
||||
@@ -54,7 +54,6 @@ func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) {
|
||||
_ = setupTestDB(t)
|
||||
stub := &preflightLocalProv{running: true, err: nil}
|
||||
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, h)
|
||||
h.provisioner = stub
|
||||
|
||||
if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil {
|
||||
@@ -187,8 +186,8 @@ func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) {
|
||||
}
|
||||
|
||||
var (
|
||||
callsIsRunning bool
|
||||
callsContainerInspectRaw bool
|
||||
callsIsRunning bool
|
||||
callsContainerInspectRaw bool
|
||||
callsRunningContainerNameDirect bool
|
||||
)
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
|
||||
@@ -262,7 +262,6 @@ func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) {
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
@@ -325,7 +324,6 @@ func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) {
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
cp := &fakeCPProv{running: true}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
@@ -515,7 +513,6 @@ func TestProxyA2A_AllowedSelf_SkipsAccessCheck(t *testing.T) {
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
@@ -664,18 +661,18 @@ func TestProxyA2A_CallerIDDerivedFromBearer(t *testing.T) {
|
||||
// (column order: workspace_id, activity_type, source_id, target_id, ...)
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs(
|
||||
"ws-target", // $1 workspace_id
|
||||
"a2a_receive", // $2 activity_type
|
||||
sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below
|
||||
sqlmock.AnyArg(), // $4 target_id
|
||||
sqlmock.AnyArg(), // $5 method
|
||||
sqlmock.AnyArg(), // $6 summary
|
||||
sqlmock.AnyArg(), // $7 request_body
|
||||
sqlmock.AnyArg(), // $8 response_body
|
||||
sqlmock.AnyArg(), // $9 tool_trace
|
||||
sqlmock.AnyArg(), // $10 duration_ms
|
||||
sqlmock.AnyArg(), // $11 status
|
||||
sqlmock.AnyArg(), // $12 error_detail
|
||||
"ws-target", // $1 workspace_id
|
||||
"a2a_receive", // $2 activity_type
|
||||
sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below
|
||||
sqlmock.AnyArg(), // $4 target_id
|
||||
sqlmock.AnyArg(), // $5 method
|
||||
sqlmock.AnyArg(), // $6 summary
|
||||
sqlmock.AnyArg(), // $7 request_body
|
||||
sqlmock.AnyArg(), // $8 response_body
|
||||
sqlmock.AnyArg(), // $9 tool_trace
|
||||
sqlmock.AnyArg(), // $10 duration_ms
|
||||
sqlmock.AnyArg(), // $11 status
|
||||
sqlmock.AnyArg(), // $12 error_detail
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
@@ -1719,6 +1716,7 @@ func TestDispatchA2A_RejectsUnsafeURL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// --- handleA2ADispatchError ---
|
||||
|
||||
func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
|
||||
@@ -1805,7 +1803,6 @@ func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
@@ -1958,7 +1955,6 @@ func TestLogA2AFailure_Smoke(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Sync workspace-name lookup (called in the caller goroutine).
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
@@ -1977,7 +1973,6 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Empty name from DB → summary uses the workspaceID as the name.
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
@@ -1994,7 +1989,6 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-ok").
|
||||
@@ -2011,7 +2005,6 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-err").
|
||||
|
||||
@@ -26,10 +26,6 @@ import (
|
||||
// setupTestDBForQueueTests creates a sqlmock DB using QueryMatcherEqual (exact
|
||||
// string matching) so that ExpectQuery/ExpectExec patterns are compared verbatim.
|
||||
// Uses the same global db.DB as setupTestDB so the handler can use it.
|
||||
//
|
||||
// IMPORTANT: db.DB is saved before assignment and restored via t.Cleanup so
|
||||
// that tests running after this one are not polluted by a closed mock.
|
||||
// Same fix as setupTestDB (handlers_test.go); same root cause as mc#975.
|
||||
func setupTestDBForQueueTests(t *testing.T) sqlmock.Sqlmock {
|
||||
t.Helper()
|
||||
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
|
||||
|
||||
@@ -482,6 +482,13 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
if errors.Is(err, ErrTalkToUserDisabled) {
|
||||
c.JSON(http.StatusForbidden, gin.H{
|
||||
"error": "talk_to_user_disabled",
|
||||
"hint": "This workspace is not allowed to send messages directly to the user. Forward your update to a parent workspace using delegate_task — they may be able to reach the user.",
|
||||
})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -464,9 +464,9 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
// Workspace existence check
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces`).
|
||||
mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
|
||||
WithArgs("ws-notify").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
|
||||
|
||||
// Persistence INSERT — verify shape
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
@@ -511,9 +511,9 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces`).
|
||||
mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
|
||||
WithArgs("ws-attach").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
|
||||
|
||||
// Capture the JSONB arg so we can assert on the persisted shape
|
||||
// AFTER the call (must include parts[].kind=file so reload
|
||||
@@ -640,9 +640,9 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces`).
|
||||
mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
|
||||
WithArgs("ws-x").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnError(fmt.Errorf("simulated db hiccup"))
|
||||
|
||||
|
||||
@@ -54,6 +54,11 @@ import (
|
||||
// timeout) surface as wrapped errors and should be treated as 503.
|
||||
var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found")
|
||||
|
||||
// ErrTalkToUserDisabled is returned when the workspace has
|
||||
// talk_to_user_enabled=false. Callers surface HTTP 403 so the Python tool
|
||||
// can detect it and suggest forwarding to a parent workspace.
|
||||
var ErrTalkToUserDisabled = errors.New("agent_message: talk_to_user disabled")
|
||||
|
||||
// AgentMessageAttachment is one file attached to an agent → user
|
||||
// message. Identical to handlers.NotifyAttachment in field set; kept
|
||||
// distinct so the writer's API doesn't import a handler type with HTTP
|
||||
@@ -107,16 +112,20 @@ func (w *AgentMessageWriter) Send(
|
||||
// notify call surfaced as "workspace not found" and masked real
|
||||
// incidents in the alert path.
|
||||
var wsName string
|
||||
var talkToUserEnabled bool
|
||||
err := w.db.QueryRowContext(ctx,
|
||||
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`,
|
||||
`SELECT name, talk_to_user_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
|
||||
workspaceID,
|
||||
).Scan(&wsName)
|
||||
).Scan(&wsName, &talkToUserEnabled)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrWorkspaceNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("agent_message: workspace lookup: %w", err)
|
||||
}
|
||||
if !talkToUserEnabled {
|
||||
return ErrTalkToUserDisabled
|
||||
}
|
||||
|
||||
// 2. Build broadcast payload + WS-emit. Same shape that ChatTab's
|
||||
// AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has
|
||||
|
||||
@@ -88,9 +88,9 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
@@ -116,9 +116,9 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-att").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
@@ -173,9 +173,9 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-missing").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}))
|
||||
|
||||
err := w.Send(context.Background(), "ws-missing", "lost in the void", nil)
|
||||
if !errors.Is(err, ErrWorkspaceNotFound) {
|
||||
@@ -202,9 +202,9 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-dbfail").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnError(errors.New("transient db error"))
|
||||
@@ -223,9 +223,9 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-trunc").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
|
||||
|
||||
longMsg := strings.Repeat("x", 200)
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
@@ -263,9 +263,9 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-bc").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Workspace Name", true))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
@@ -315,7 +315,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
transientErr := errors.New("connection refused")
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-dbdown").
|
||||
WillReturnError(transientErr)
|
||||
|
||||
@@ -350,9 +350,9 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
|
||||
// the byte-slice bug.
|
||||
msg := strings.Repeat("你", 200)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-cjk").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
@@ -395,9 +395,9 @@ func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-noatt").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("X", true))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
|
||||
@@ -116,6 +116,9 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
|
||||
"created_at": createdAt,
|
||||
})
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("ListPendingApprovals rows.Err: %v", err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, approvals)
|
||||
}
|
||||
@@ -155,6 +158,9 @@ func (h *ApprovalsHandler) List(c *gin.Context) {
|
||||
"created_at": createdAt,
|
||||
})
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("ListApprovals rows.Err workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, approvals)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
@@ -699,8 +698,7 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
|
||||
|
||||
var result []map[string]interface{}
|
||||
for rows.Next() {
|
||||
var delegationID, callerID, calleeID, taskPreview, status string
|
||||
var resultPreview, errorDetail sql.NullString
|
||||
var delegationID, callerID, calleeID, taskPreview, status, resultPreview, errorDetail string
|
||||
var lastHeartbeat, deadline, createdAt, updatedAt *time.Time
|
||||
if err := rows.Scan(
|
||||
&delegationID, &callerID, &calleeID, &taskPreview,
|
||||
@@ -719,11 +717,11 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
|
||||
"updated_at": updatedAt,
|
||||
"_ledger": true, // marker so callers know this row is from the ledger
|
||||
}
|
||||
if resultPreview.Valid && resultPreview.String != "" {
|
||||
entry["response_preview"] = textutil.TruncateBytes(resultPreview.String, 300)
|
||||
if resultPreview != "" {
|
||||
entry["response_preview"] = textutil.TruncateBytes(resultPreview, 300)
|
||||
}
|
||||
if errorDetail.Valid && errorDetail.String != "" {
|
||||
entry["error"] = errorDetail.String
|
||||
if errorDetail != "" {
|
||||
entry["error"] = errorDetail
|
||||
}
|
||||
if lastHeartbeat != nil {
|
||||
entry["last_heartbeat"] = lastHeartbeat
|
||||
|
||||
@@ -145,54 +145,6 @@ func TestListDelegationsFromLedger_MultipleRows(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListDelegationsFromLedger_NullsOmitted(t *testing.T) {
|
||||
// last_heartbeat, deadline, result_preview, error_detail are all NULL.
|
||||
// Handler must not panic and must omit those keys from the map.
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
|
||||
|
||||
now := time.Now()
|
||||
rows := sqlmock.NewRows([]string{
|
||||
"delegation_id", "caller_id", "callee_id", "task_preview",
|
||||
"status", "result_preview", "error_detail",
|
||||
"last_heartbeat", "deadline", "created_at", "updated_at",
|
||||
}).
|
||||
AddRow("del-1", "ws-1", "ws-2", "task", "queued", nil, nil, nil, nil, now, now)
|
||||
mock.ExpectQuery("SELECT .+ FROM delegations").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(rows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("expected 1 entry, got %d", len(got))
|
||||
}
|
||||
e := got[0]
|
||||
if _, ok := e["last_heartbeat"]; ok {
|
||||
t.Error("last_heartbeat should be absent when NULL")
|
||||
}
|
||||
if _, ok := e["deadline"]; ok {
|
||||
t.Error("deadline should be absent when NULL")
|
||||
}
|
||||
if _, ok := e["response_preview"]; ok {
|
||||
t.Error("response_preview should be absent when NULL result_preview")
|
||||
}
|
||||
if _, ok := e["error"]; ok {
|
||||
t.Error("error should be absent when NULL error_detail")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListDelegationsFromLedger_QueryError(t *testing.T) {
|
||||
// Query failure returns nil — graceful fallback, no panic.
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
@@ -486,3 +438,10 @@ func TestListDelegationsFromActivityLogs_RowsErr(t *testing.T) {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestListDelegationsFromActivityLogs_ScanErrorSkipped is removed.
|
||||
//
|
||||
// Same reason as TestListDelegationsFromLedger_ScanError: Go 1.25 causes
|
||||
// sqlmock.NewRows([]string{}).AddRow(...) to panic in test SETUP. The handler
|
||||
// has no recover(), so a scan panic would crash the process — the correct
|
||||
// behaviour. Real-DB integration tests cover this path.
|
||||
|
||||
@@ -230,20 +230,21 @@ func TestWorkspaceList_WithData(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// 21 cols — see scanWorkspaceRow for order (max_concurrent_tasks
|
||||
// lands between active_tasks and last_error_rate).
|
||||
// 23 cols — broadcast_enabled + talk_to_user_enabled added after monthly_spend
|
||||
// (migration 20260514). Column order must match scanWorkspaceRow exactly.
|
||||
columns := []string{
|
||||
"id", "name", "role", "tier", "status", "agent_card", "url",
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks",
|
||||
"last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
rows := sqlmock.NewRows(columns).
|
||||
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001",
|
||||
nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)).
|
||||
nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0), false, true).
|
||||
AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "",
|
||||
nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0))
|
||||
nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0), false, true)
|
||||
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WillReturnRows(rows)
|
||||
|
||||
@@ -29,11 +29,6 @@ func init() {
|
||||
// setupTestDB creates a sqlmock DB and assigns it to the global db.DB.
|
||||
// It also disables the SSRF URL check so that httptest.NewServer loopback
|
||||
// URLs and fake hostnames (*.example) used in tests don't trigger rejections.
|
||||
//
|
||||
// IMPORTANT: db.DB is saved before assignment and restored via t.Cleanup so
|
||||
// that tests running after this one are not polluted by a closed mock.
|
||||
// This is the single root cause of the systemic CI/Platform (Go) failures on
|
||||
// main HEAD 8026f020 (mc#975).
|
||||
func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
t.Helper()
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
@@ -62,11 +57,6 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
return mock
|
||||
}
|
||||
|
||||
func waitForHandlerAsyncBeforeDBCleanup(t *testing.T, h *WorkspaceHandler) {
|
||||
t.Helper()
|
||||
t.Cleanup(h.waitAsyncForTest)
|
||||
}
|
||||
|
||||
// setupTestRedis creates a miniredis instance and assigns it to the global db.RDB.
|
||||
func setupTestRedis(t *testing.T) *miniredis.Miniredis {
|
||||
t.Helper()
|
||||
@@ -366,11 +356,6 @@ func TestWorkspaceCreate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuildProvisionerConfig_IncludesAwarenessSettings(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery(`SELECT digest FROM runtime_image_pins`).
|
||||
WithArgs("claude-code").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
|
||||
|
||||
@@ -407,21 +392,21 @@ func TestWorkspaceList(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
|
||||
|
||||
// 21 cols: `max_concurrent_tasks` added between active_tasks and
|
||||
// last_error_rate (see scanWorkspaceRow + COALESCE(w.max_concurrent_tasks, 1)
|
||||
// in workspace.go). Column order must match that scan exactly.
|
||||
// 23 cols: broadcast_enabled + talk_to_user_enabled added after monthly_spend
|
||||
// (migration 20260514). Column order must match scanWorkspaceRow exactly.
|
||||
columns := []string{
|
||||
"id", "name", "role", "tier", "status", "agent_card", "url",
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks",
|
||||
"last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
rows := sqlmock.NewRows(columns).
|
||||
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001",
|
||||
nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)).
|
||||
nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0), false, true).
|
||||
AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "",
|
||||
nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0))
|
||||
nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0), false, true)
|
||||
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WillReturnRows(rows)
|
||||
@@ -1135,13 +1120,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) {
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs("dddddddd-0004-0000-0000-000000000000").
|
||||
WillReturnRows(sqlmock.NewRows(columns).AddRow(
|
||||
"dddddddd-0004-0000-0000-000000000000", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000",
|
||||
nil, 2, 1, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false,
|
||||
nil, int64(0),
|
||||
nil, int64(0), false, true,
|
||||
))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
@@ -248,6 +248,9 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
|
||||
b.WriteString(content)
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("ResolveInstructions rows.Err workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"workspace_id": workspaceID,
|
||||
@@ -258,6 +261,7 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
|
||||
func scanInstructions(rows interface {
|
||||
Next() bool
|
||||
Scan(dest ...interface{}) error
|
||||
Err() error
|
||||
}) []Instruction {
|
||||
var instructions []Instruction
|
||||
for rows.Next() {
|
||||
@@ -269,6 +273,9 @@ func scanInstructions(rows interface {
|
||||
}
|
||||
instructions = append(instructions, inst)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("scanInstructions rows.Err: %v", err)
|
||||
}
|
||||
if instructions == nil {
|
||||
instructions = []Instruction{}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -751,9 +751,9 @@ func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-err").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
|
||||
|
||||
// INSERT fails — must NOT abort the tool response.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
@@ -802,9 +802,9 @@ func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
|
||||
|
||||
const userMessage = "Hi there from the agent"
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-shape").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
|
||||
|
||||
// Capture the response_body argument and assert its exact shape.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
@@ -861,9 +861,9 @@ func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
|
||||
// before it does anything else. Returning a name lets the
|
||||
// broadcast payload populate; the test doesn't assert on the
|
||||
// broadcast (no observable WS in this fake), only on the DB.
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
|
||||
WithArgs("ws-msg").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
|
||||
|
||||
// The persistence INSERT — pin the exact shape so a future
|
||||
// refactor that switches columns or drops `method='notify'`
|
||||
|
||||
@@ -80,26 +80,103 @@ func hasUnresolvedVarRef(original, expanded string) bool {
|
||||
}
|
||||
|
||||
// expandWithEnv expands ${VAR} and $VAR references in s using the env map.
|
||||
// Falls back to the platform process env if a var isn't in the map.
|
||||
// Shell variables must start with a letter or '_' per POSIX; invalid identifiers
|
||||
// are returned literally so that "$100" and "$5" stay as-is.
|
||||
// Falls back to the platform process env only when the whole value is a
|
||||
// single variable reference; embedded process-env expansion is too broad for
|
||||
// imported org YAML because host variables such as HOME are not template data.
|
||||
func expandWithEnv(s string, env map[string]string) string {
|
||||
return os.Expand(s, func(key string) string {
|
||||
if len(key) == 0 {
|
||||
return "$"
|
||||
if s == "" {
|
||||
return ""
|
||||
}
|
||||
var b strings.Builder
|
||||
for i := 0; i < len(s); {
|
||||
if s[i] != '$' {
|
||||
b.WriteByte(s[i])
|
||||
i++
|
||||
continue
|
||||
}
|
||||
c := key[0]
|
||||
if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_') {
|
||||
return "$" + key // not a valid shell identifier — return literal
|
||||
|
||||
if i+1 >= len(s) {
|
||||
b.WriteByte('$')
|
||||
i++
|
||||
continue
|
||||
}
|
||||
if v, ok := env[key]; ok {
|
||||
return v
|
||||
|
||||
if s[i+1] == '{' {
|
||||
end := strings.IndexByte(s[i+2:], '}')
|
||||
if end < 0 {
|
||||
b.WriteByte('$')
|
||||
i++
|
||||
continue
|
||||
}
|
||||
end += i + 2
|
||||
key := s[i+2 : end]
|
||||
ref := s[i : end+1]
|
||||
b.WriteString(expandEnvRef(key, ref, s, env))
|
||||
i = end + 1
|
||||
continue
|
||||
}
|
||||
return os.Getenv(key)
|
||||
})
|
||||
|
||||
if !isEnvIdentStart(s[i+1]) {
|
||||
b.WriteByte('$')
|
||||
i++
|
||||
continue
|
||||
}
|
||||
j := i + 2
|
||||
for j < len(s) && isEnvIdentPart(s[j]) {
|
||||
j++
|
||||
}
|
||||
key := s[i+1 : j]
|
||||
ref := s[i:j]
|
||||
b.WriteString(expandEnvRef(key, ref, s, env))
|
||||
i = j
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
|
||||
// expandEnvRef resolves a single variable reference extracted from s.
|
||||
//
|
||||
// Guards:
|
||||
// - Empty key → "$$" escape, return "$"
|
||||
// - key[0] not POSIX ident start → "$" + partial chars, return "$<chars>"
|
||||
// - Key in env map → return the mapped value (template override wins)
|
||||
// - Otherwise → only fall back to os.Getenv if the whole input string IS the
|
||||
// variable reference (ref == whole).
|
||||
//
|
||||
// Bare $VAR format:
|
||||
// $HOME (alone) → ref==whole → os.Getenv ✓ (host HOME is org-template HOME)
|
||||
// $HOME/path (partial) → ref!=whole → literal "$HOME" ✓ (CWE-78: prevents host leak)
|
||||
//
|
||||
// Braced ${VAR} format:
|
||||
// ${HOME} (alone) → ref==whole → os.Getenv ✓
|
||||
// ${ROLE}/admin (partial) → ref!=whole → literal ✓
|
||||
// "yes and ${NOT_SET}" (embedded) → ref!=whole → literal ✓
|
||||
//
|
||||
// This is the CWE-78 fix from commit a3a358f9.
|
||||
func expandEnvRef(key, ref, whole string, env map[string]string) string {
|
||||
if key == "" {
|
||||
return "$"
|
||||
}
|
||||
if !isEnvIdentStart(key[0]) {
|
||||
return "$" + key
|
||||
}
|
||||
if v, ok := env[key]; ok {
|
||||
return v
|
||||
}
|
||||
if ref == whole {
|
||||
return os.Getenv(key)
|
||||
}
|
||||
return ref
|
||||
}
|
||||
|
||||
func isEnvIdentStart(c byte) bool {
|
||||
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_'
|
||||
}
|
||||
|
||||
func isEnvIdentPart(c byte) bool {
|
||||
return isEnvIdentStart(c) || (c >= '0' && c <= '9')
|
||||
}
|
||||
|
||||
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env .env and the workspace-specific .env
|
||||
// (workspace overrides org root). Used by both secret injection and channel
|
||||
// config expansion.
|
||||
//
|
||||
|
||||
@@ -104,8 +104,8 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) {
|
||||
// documents this design choice; callers who need empty=resolved should
|
||||
// pre-process the output before calling hasUnresolvedVarRef.
|
||||
{"${VAR}", "", true},
|
||||
{"${VAR}", "value", false}, // var replaced
|
||||
{"$VAR", "value", false}, // bare var replaced
|
||||
{"${VAR}", "value", false}, // var replaced
|
||||
{"$VAR", "value", false}, // bare var replaced
|
||||
{"prefix${VAR}suffix", "prefixvaluesuffix", false},
|
||||
{"${A}${B}", "ab", false},
|
||||
// FOO=FOO and BAR=BAR — both vars found and replaced. Expanded output
|
||||
@@ -125,14 +125,14 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) {
|
||||
func TestHasUnresolvedVarRef_Unresolved(t *testing.T) {
|
||||
// Expansion left the refs intact → unresolved.
|
||||
cases := []struct {
|
||||
orig string
|
||||
orig string
|
||||
expanded string
|
||||
}{
|
||||
{"${VAR}", "${VAR}"}, // untouched
|
||||
{"$VAR", "$VAR"}, // bare untouched
|
||||
{"${VAR}", "${VAR}"}, // untouched
|
||||
{"$VAR", "$VAR"}, // bare untouched
|
||||
{"prefix${VAR}suffix", "prefix${VAR}suffix"},
|
||||
{"${A}${B}", "${A}${B}"}, // both unresolved
|
||||
{"${FOO}", ""}, // empty result with var ref in original
|
||||
{"${A}${B}", "${A}${B}"}, // both unresolved
|
||||
{"${FOO}", ""}, // empty result with var ref in original
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.orig, func(t *testing.T) {
|
||||
@@ -205,8 +205,8 @@ func TestMergeCategoryRouting_WorkspaceOverrides(t *testing.T) {
|
||||
"ui": {"Frontend Engineer"},
|
||||
}
|
||||
ws := map[string][]string{
|
||||
"security": {"SRE Team"}, // narrows
|
||||
"ui": {}, // drops
|
||||
"security": {"SRE Team"}, // narrows
|
||||
"ui": {}, // drops
|
||||
"infra": {"Platform Team"}, // adds
|
||||
}
|
||||
r := mergeCategoryRouting(defaults, ws)
|
||||
@@ -462,47 +462,11 @@ func TestExpandWithEnv_LiteralDollar(t *testing.T) {
|
||||
func TestExpandWithEnv_PartiallyPresent(t *testing.T) {
|
||||
env := map[string]string{"SET": "yes"}
|
||||
result := expandWithEnv("${SET} and ${NOT_SET}", env)
|
||||
// ${SET} resolved from env; ${NOT_SET} stays literal (not whole-string ref,
|
||||
// so os.Getenv fallback is NOT used — CWE-78 regression guard).
|
||||
assert.Equal(t, "yes and ${NOT_SET}", result)
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_EmbeddedMissingProcessEnvStaysLiteral(t *testing.T) {
|
||||
t.Setenv("MOL_TEST_EMBEDDED_MISSING", "")
|
||||
|
||||
result := expandWithEnv("prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", map[string]string{})
|
||||
assert.Equal(t, "prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", result)
|
||||
}
|
||||
|
||||
// POSIX identifier guard regression tests (CWE-78 fix).
|
||||
// Keys not starting with [a-zA-Z_] must not be looked up in env or os.Getenv.
|
||||
func TestExpandWithEnv_DigitPrefix_NotExpanded(t *testing.T) {
|
||||
// ${0}, ${5}, ${1VAR} — numeric prefix → not a valid shell identifier.
|
||||
// Guard must return "$0", "$5", "$1VAR" literally; no env lookup.
|
||||
cases := []struct {
|
||||
input string
|
||||
want string
|
||||
}{
|
||||
{"${0}", "$0"},
|
||||
{"${5}", "$5"},
|
||||
{"${1VAR}", "$1VAR"},
|
||||
{"prefix ${0} suffix", "prefix $0 suffix"},
|
||||
{"$0", "$0"},
|
||||
{"$5", "$5"},
|
||||
{"HOME=${HOME}", "HOME=${HOME}"}, // HOME is valid but embedded in larger string
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.input, func(t *testing.T) {
|
||||
got := expandWithEnv(tc.input, map[string]string{})
|
||||
assert.Equal(t, tc.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_EmptyKey_ReturnsDollar(t *testing.T) {
|
||||
// ${} → "$" (empty key, guard returns "$")
|
||||
result := expandWithEnv("value=${}", map[string]string{})
|
||||
assert.Equal(t, "value=$", result)
|
||||
}
|
||||
|
||||
// mergeCategoryRouting tests — unions defaults with per-workspace routing.
|
||||
|
||||
// ── Additional coverage: mergeCategoryRouting ──────────────────────
|
||||
@@ -582,8 +546,8 @@ func TestRenderCategoryRoutingYAML_SingleCategory(t *testing.T) {
|
||||
|
||||
func TestRenderCategoryRoutingYAML_MultipleCategoriesSorted(t *testing.T) {
|
||||
routing := map[string][]string{
|
||||
"zebra": {"RoleZ"},
|
||||
"alpha": {"RoleA"},
|
||||
"zebra": {"RoleZ"},
|
||||
"alpha": {"RoleA"},
|
||||
"middleware": {"RoleM"},
|
||||
}
|
||||
result, err := renderCategoryRoutingYAML(routing)
|
||||
@@ -626,7 +590,7 @@ func TestRenderCategoryRoutingYAML_SpecialCharactersEscaped(t *testing.T) {
|
||||
// ── Additional coverage: appendYAMLBlock ───────────────────────────
|
||||
func TestAppendYAMLBlock_BothEmpty(t *testing.T) {
|
||||
result := appendYAMLBlock(nil, "")
|
||||
assert.Nil(t, result)
|
||||
assert.Nil(t, result) // append(nil, []byte("")...) returns nil in Go
|
||||
}
|
||||
|
||||
func TestAppendYAMLBlock_ExistingHasNewline(t *testing.T) {
|
||||
|
||||
@@ -276,3 +276,121 @@ func TestMergeCategoryRouting_OriginalMapsUnmodified(t *testing.T) {
|
||||
t.Error("ws routing should be unmodified after merge")
|
||||
}
|
||||
}
|
||||
|
||||
// ── expandWithEnv ─────────────────────────────────────────────────────────────
|
||||
//
|
||||
// CWE-78 regression tests. The original fix (a3a358f9) ensures that partial
|
||||
// variable references like $HOME/path are NOT resolved via os.Getenv — the
|
||||
// host HOME env var must not leak into org template values. Only whole-string
|
||||
// references ($VAR or ${VAR}) may fall back to the host process environment.
|
||||
|
||||
func TestExpandWithEnv_PartialRefDollarHomePath(t *testing.T) {
|
||||
// $HOME/path must NOT resolve to the host's HOME env var.
|
||||
// The literal $HOME must be returned as-is.
|
||||
got := expandWithEnv("$HOME/path", nil)
|
||||
if got != "$HOME/path" {
|
||||
t.Errorf("$HOME/path: got %q, want literal $HOME/path", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_PartialRefBracedRoleAdmin(t *testing.T) {
|
||||
// ${ROLE}/admin — ROLE is not in env, so expand to the literal ${ROLE}/admin.
|
||||
got := expandWithEnv("${ROLE}/admin", nil)
|
||||
if got != "${ROLE}/admin" {
|
||||
t.Errorf("${ROLE}/admin: got %q, want literal ${ROLE}/admin", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_PartialRefMiddleOfString(t *testing.T) {
|
||||
// $ROLE in the middle of a string — literal, not os.Getenv.
|
||||
got := expandWithEnv("prefix/$ROLE/suffix", nil)
|
||||
if got != "prefix/$ROLE/suffix" {
|
||||
t.Errorf("prefix/$ROLE/suffix: got %q, want literal", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_WholeVarInEnv(t *testing.T) {
|
||||
// Whole-string $VAR that IS in env — env value wins.
|
||||
env := map[string]string{"FOO": "barvalue"}
|
||||
got := expandWithEnv("$FOO", env)
|
||||
if got != "barvalue" {
|
||||
t.Errorf("$FOO with FOO=barvalue: got %q, want barvalue", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_WholeVarBracedInEnv(t *testing.T) {
|
||||
// Whole-string ${VAR} that IS in env — env value wins.
|
||||
env := map[string]string{"FOO": "barvalue"}
|
||||
got := expandWithEnv("${FOO}", env)
|
||||
if got != "barvalue" {
|
||||
t.Errorf("${FOO} with FOO=barvalue: got %q, want barvalue", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_WholeVarNotInEnvBare(t *testing.T) {
|
||||
// Whole-string $VAR not in env — falls back to os.Getenv.
|
||||
// If the host has the var, we get the host value. If not, empty.
|
||||
// At minimum, the result must NOT be the literal "$UNDEFINED_VAR_9Z".
|
||||
got := expandWithEnv("$UNDEFINED_VAR_9Z", nil)
|
||||
if got == "$UNDEFINED_VAR_9Z" {
|
||||
t.Errorf("$UNDEFINED_VAR_9Z: should expand (whole-string fallback to os.Getenv), got literal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_WholeVarNotInEnvBraced(t *testing.T) {
|
||||
// Whole-string ${VAR} not in env — falls back to os.Getenv.
|
||||
got := expandWithEnv("${UNDEFINED_VAR_9Z}", nil)
|
||||
if got == "${UNDEFINED_VAR_9Z}" {
|
||||
t.Errorf("${UNDEFINED_VAR_9Z}: should expand (whole-string fallback to os.Getenv), got literal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_EmptyString(t *testing.T) {
|
||||
got := expandWithEnv("", map[string]string{"FOO": "bar"})
|
||||
if got != "" {
|
||||
t.Errorf("empty string: got %q, want empty", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_NoVarRefs(t *testing.T) {
|
||||
got := expandWithEnv("plain string with no vars", map[string]string{"FOO": "bar"})
|
||||
if got != "plain string with no vars" {
|
||||
t.Errorf("plain string: got %q, want unchanged", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_MultipleVarRefs(t *testing.T) {
|
||||
// Two vars, both whole — both expand from env.
|
||||
env := map[string]string{"A": "alpha", "B": "beta"}
|
||||
got := expandWithEnv("$A and $B and more", env)
|
||||
if got != "alpha and beta and more" {
|
||||
t.Errorf("multiple vars: got %q, want alpha and beta and more", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_NumericVarRef(t *testing.T) {
|
||||
// $5 — starts with digit, not a valid identifier start.
|
||||
// Must return the literal "$5", not expand via os.Getenv.
|
||||
got := expandWithEnv("$5", map[string]string{"5": "five"})
|
||||
if got != "$5" {
|
||||
t.Errorf("$5: got %q, want literal $5", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_DollarEscape(t *testing.T) {
|
||||
// $$ → both $ written literally (each $ is not followed by an identifier char,
|
||||
// so it is written as-is). No special escape sequence for $$.
|
||||
got := expandWithEnv("$$", nil)
|
||||
if got != "$$" {
|
||||
t.Errorf("$$: got %q, want literal $$", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandWithEnv_MixedPartialAndWhole(t *testing.T) {
|
||||
// $A is in env (whole), $HOME is partial — only $A expands.
|
||||
env := map[string]string{"A": "alpha"}
|
||||
got := expandWithEnv("$A at $HOME", env)
|
||||
if got != "alpha at $HOME" {
|
||||
t.Errorf("$A at $HOME: got %q, want alpha at $HOME", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,11 +342,6 @@ func TestPluginInstall_InstanceLookupError_Returns503(t *testing.T) {
|
||||
// ---------- dispatch: uninstall ----------
|
||||
|
||||
func TestPluginUninstall_SaaS_DispatchesToEIC(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectExec("DELETE FROM workspace_plugins WHERE workspace_id").
|
||||
WithArgs("ws-1", "browser-automation").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
stubReadPluginManifestViaEIC(t, func(ctx context.Context, instanceID, runtime, pluginName string) ([]byte, error) {
|
||||
return []byte("name: browser-automation\nskills:\n - browse\n"), nil
|
||||
})
|
||||
|
||||
@@ -629,9 +629,6 @@ func TestPluginInstall_RejectsUnknownScheme(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPluginInstall_LocalSourceReachesContainerLookup(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectAllowlistAllowAll(mock)
|
||||
|
||||
base := t.TempDir()
|
||||
pluginDir := filepath.Join(base, "demo")
|
||||
_ = os.MkdirAll(pluginDir, 0o755)
|
||||
@@ -958,14 +955,14 @@ func TestLogInstallLimitsOnce(t *testing.T) {
|
||||
|
||||
func TestRegexpEscapeForAwk(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"my-plugin": `my-plugin`,
|
||||
"# Plugin: foo /": `# Plugin: foo \/`,
|
||||
"# Plugin: a.b /": `# Plugin: a\.b \/`,
|
||||
"foo[bar]": `foo\[bar\]`,
|
||||
"a*b+c?": `a\*b\+c\?`,
|
||||
"path|with|pipes": `path\|with\|pipes`,
|
||||
`back\slash`: `back\\slash`,
|
||||
"": ``,
|
||||
"my-plugin": `my-plugin`,
|
||||
"# Plugin: foo /": `# Plugin: foo \/`,
|
||||
"# Plugin: a.b /": `# Plugin: a\.b \/`,
|
||||
"foo[bar]": `foo\[bar\]`,
|
||||
"a*b+c?": `a\*b\+c\?`,
|
||||
"path|with|pipes": `path\|with\|pipes`,
|
||||
`back\slash`: `back\\slash`,
|
||||
"": ``,
|
||||
}
|
||||
for in, want := range cases {
|
||||
got := regexpEscapeForAwk(in)
|
||||
@@ -1250,7 +1247,7 @@ func TestPluginDownload_GithubSchemeStreamsTarball(t *testing.T) {
|
||||
scheme: "github",
|
||||
fetchFn: func(_ context.Context, _ string, dst string) (string, error) {
|
||||
files := map[string]string{
|
||||
"plugin.yaml": "name: remote-plugin\nversion: 1.0.0\n",
|
||||
"plugin.yaml": "name: remote-plugin\nversion: 1.0.0\n",
|
||||
"skills/x/SKILL.md": "---\nname: x\n---\n",
|
||||
"adapters/claude_code.py": "from plugins_registry.builtins import AgentskillsAdaptor as Adaptor\n",
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
|
||||
// Non-blocking send — don't stall the restart cycle.
|
||||
// Run in a detached goroutine so the caller (runRestartCycle) can
|
||||
// proceed to stopForRestart without waiting.
|
||||
h.goAsync(func() {
|
||||
go func() {
|
||||
signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -109,7 +109,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
|
||||
} else {
|
||||
log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode)
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
// resolveAgentURLForRestartSignal returns the routable URL for the workspace
|
||||
|
||||
@@ -271,7 +271,6 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
|
||||
WorkspaceHandler: newHandlerWithTestDeps(t),
|
||||
errToReturn: context.DeadlineExceeded,
|
||||
}
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
|
||||
|
||||
hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
@@ -64,7 +64,7 @@ func (h *SecretsHandler) List(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("List secrets rows.Err: %v", err)
|
||||
log.Printf("List workspace secrets iteration error: %v", err)
|
||||
}
|
||||
|
||||
// 2. Global secrets not overridden at workspace level
|
||||
@@ -95,7 +95,7 @@ func (h *SecretsHandler) List(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
if err := globalRows.Err(); err != nil {
|
||||
log.Printf("List secrets (global) rows.Err: %v", err)
|
||||
log.Printf("List global secrets iteration error: %v", err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, secrets)
|
||||
@@ -181,7 +181,7 @@ func (h *SecretsHandler) Values(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
if err := globalRows.Err(); err != nil {
|
||||
log.Printf("secrets.Values globalRows.Err: %v", err)
|
||||
log.Printf("secrets.Values: global rows iteration error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ func (h *SecretsHandler) Values(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
if err := wsRows.Err(); err != nil {
|
||||
log.Printf("secrets.Values wsRows.Err: %v", err)
|
||||
log.Printf("secrets.Values: workspace rows iteration error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,7 +337,7 @@ func (h *SecretsHandler) ListGlobal(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("ListGlobal rows.Err: %v", err)
|
||||
log.Printf("ListGlobal iteration error: %v", err)
|
||||
}
|
||||
c.JSON(http.StatusOK, secrets)
|
||||
}
|
||||
@@ -416,7 +416,7 @@ func (h *SecretsHandler) restartAllAffectedByGlobalKey(key string) {
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("restartAllAffectedByGlobalKey rows.Err: %v", err)
|
||||
log.Printf("restartAllAffectedByGlobalKey: iteration error: %v", err)
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return
|
||||
|
||||
@@ -340,11 +340,6 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) {
|
||||
// a workspace must still be able to access its own terminal. The CanCommunicate
|
||||
// fast-path returns true when callerID == targetID.
|
||||
func TestTerminalConnect_KI005_AllowsOwnTerminal(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery("SELECT COALESCE").
|
||||
WithArgs("ws-alice").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
|
||||
|
||||
// CanCommunicate fast-path: callerID == targetID → returns true without DB.
|
||||
prev := canCommunicateCheck
|
||||
canCommunicateCheck = func(callerID, targetID string) bool { return callerID == targetID }
|
||||
@@ -372,11 +367,6 @@ func TestTerminalConnect_KI005_AllowsOwnTerminal(t *testing.T) {
|
||||
// skip the CanCommunicate check entirely and fall through to the Docker auth path.
|
||||
// We assert they get the nil-docker 503 instead of 403.
|
||||
func TestTerminalConnect_KI005_SkipsCheckWithoutHeader(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery("SELECT COALESCE").
|
||||
WithArgs("ws-any").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
|
||||
|
||||
h := NewTerminalHandler(nil) // nil docker → 503 if reached
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -449,9 +439,6 @@ func TestTerminalConnect_KI005_AllowsSiblingWorkspace(t *testing.T) {
|
||||
mock.ExpectExec(`UPDATE workspace_auth_tokens SET last_used_at`).
|
||||
WithArgs(sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT COALESCE").
|
||||
WithArgs("ws-dev").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
|
||||
|
||||
h := NewTerminalHandler(nil)
|
||||
w := httptest.NewRecorder()
|
||||
@@ -476,10 +463,7 @@ func TestTerminalConnect_KI005_AllowsSiblingWorkspace(t *testing.T) {
|
||||
// introduced in GH#1885: internal routing uses org tokens which are not in
|
||||
// workspace_auth_tokens, so ValidateToken would always fail for them.
|
||||
func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) {
|
||||
mock := setupTestDB(t) // no ValidateToken ExpectQuery — none should fire
|
||||
mock.ExpectQuery("SELECT COALESCE").
|
||||
WithArgs("ws-target").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
|
||||
setupTestDB(t) // no ValidateToken ExpectQuery — none should fire
|
||||
prev := canCommunicateCheck
|
||||
canCommunicateCheck = func(callerID, targetID string) bool {
|
||||
// Simulate platform agent → target workspace (same org).
|
||||
@@ -560,3 +544,4 @@ func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) {
|
||||
args)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -67,6 +67,9 @@ func (h *TokenHandler) List(c *gin.Context) {
|
||||
}
|
||||
tokens = append(tokens, t)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("ListTokens rows.Err workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"tokens": tokens,
|
||||
|
||||
@@ -74,7 +74,10 @@ type WorkspaceHandler struct {
|
||||
// memory plugin). main.go sets this to plugin.DeleteNamespace
|
||||
// when MEMORY_PLUGIN_URL is configured.
|
||||
namespaceCleanupFn func(ctx context.Context, workspaceID string)
|
||||
asyncWG sync.WaitGroup
|
||||
// asyncWG tracks goroutines launched by goAsync so tests can wait
|
||||
// for async DB users (restart, provision) before asserting results.
|
||||
// Matches the pattern from main commit 1c3b4ff3.
|
||||
asyncWG sync.WaitGroup
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) goAsync(fn func()) {
|
||||
@@ -592,7 +595,7 @@ func scanWorkspaceRow(rows interface {
|
||||
var id, name, role, status, url, sampleError, currentTask, runtime, workspaceDir string
|
||||
var tier, activeTasks, maxConcurrentTasks, uptimeSeconds int
|
||||
var errorRate, x, y float64
|
||||
var collapsed bool
|
||||
var collapsed, broadcastEnabled, talkToUserEnabled bool
|
||||
var parentID *string
|
||||
var agentCard []byte
|
||||
var budgetLimit sql.NullInt64
|
||||
@@ -601,7 +604,7 @@ func scanWorkspaceRow(rows interface {
|
||||
err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url,
|
||||
&parentID, &activeTasks, &maxConcurrentTasks, &errorRate, &sampleError, &uptimeSeconds,
|
||||
¤tTask, &runtime, &workspaceDir, &x, &y, &collapsed,
|
||||
&budgetLimit, &monthlySpend)
|
||||
&budgetLimit, &monthlySpend, &broadcastEnabled, &talkToUserEnabled)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -625,6 +628,8 @@ func scanWorkspaceRow(rows interface {
|
||||
"x": x,
|
||||
"y": y,
|
||||
"collapsed": collapsed,
|
||||
"broadcast_enabled": broadcastEnabled,
|
||||
"talk_to_user_enabled": talkToUserEnabled,
|
||||
}
|
||||
|
||||
// budget_limit: nil when no limit set, int64 otherwise
|
||||
@@ -660,7 +665,8 @@ const workspaceListQuery = `
|
||||
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
|
||||
COALESCE(w.workspace_dir, ''),
|
||||
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
|
||||
w.budget_limit, COALESCE(w.monthly_spend, 0)
|
||||
w.budget_limit, COALESCE(w.monthly_spend, 0),
|
||||
w.broadcast_enabled, w.talk_to_user_enabled
|
||||
FROM workspaces w
|
||||
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
|
||||
WHERE w.status != 'removed'
|
||||
@@ -720,7 +726,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
|
||||
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
|
||||
COALESCE(w.workspace_dir, ''),
|
||||
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
|
||||
w.budget_limit, COALESCE(w.monthly_spend, 0)
|
||||
w.budget_limit, COALESCE(w.monthly_spend, 0),
|
||||
w.broadcast_enabled, w.talk_to_user_enabled
|
||||
FROM workspaces w
|
||||
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
|
||||
WHERE w.id = $1
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
package handlers
|
||||
|
||||
// workspace_abilities.go — PATCH /workspaces/:id/abilities
|
||||
//
|
||||
// Allows users and admin agents to toggle two workspace-level ability flags:
|
||||
//
|
||||
// broadcast_enabled — workspace may POST /broadcast to send org-wide messages
|
||||
// talk_to_user_enabled — workspace may deliver canvas chat messages via
|
||||
// send_message_to_user / POST /notify
|
||||
//
|
||||
// Gated behind AdminAuth so workspace agents cannot self-modify their own
|
||||
// ability flags (that would let any agent grant itself broadcast rights or
|
||||
// suppress its own chat-silence constraint).
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// AbilitiesPayload carries the subset of ability flags the caller wants to
|
||||
// update. Fields are pointers so that the handler can distinguish "caller
|
||||
// supplied false" from "caller omitted the field" (omitempty semantics).
|
||||
type AbilitiesPayload struct {
|
||||
BroadcastEnabled *bool `json:"broadcast_enabled"`
|
||||
TalkToUserEnabled *bool `json:"talk_to_user_enabled"`
|
||||
}
|
||||
|
||||
// PatchAbilities handles PATCH /workspaces/:id/abilities (AdminAuth).
|
||||
func PatchAbilities(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
if err := validateWorkspaceID(id); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
|
||||
return
|
||||
}
|
||||
|
||||
var body AbilitiesPayload
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "at least one ability field required"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var exists bool
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND status != 'removed')`, id,
|
||||
).Scan(&exists); err != nil || !exists {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
|
||||
if body.BroadcastEnabled != nil {
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`,
|
||||
id, *body.BroadcastEnabled,
|
||||
); err != nil {
|
||||
log.Printf("PatchAbilities broadcast_enabled for %s: %v", id, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if body.TalkToUserEnabled != nil {
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`,
|
||||
id, *body.TalkToUserEnabled,
|
||||
); err != nil {
|
||||
log.Printf("PatchAbilities talk_to_user_enabled for %s: %v", id, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "updated"})
|
||||
}
|
||||
@@ -0,0 +1,386 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ---------- Validation ----------
|
||||
|
||||
func TestPatchAbilities_InvalidWorkspaceID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// No DB calls expected — validation fails first.
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/workspaces/not-a-uuid/abilities", bytes.NewBufferString(`{"broadcast_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["error"] != "invalid workspace ID" {
|
||||
t.Errorf("unexpected error: %v", resp["error"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_NoFieldsProvided(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["error"] != "at least one ability field required" {
|
||||
t.Errorf("unexpected error: %v", resp["error"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_InvalidBody(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{invalid json}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Workspace existence check ----------
|
||||
|
||||
func TestPatchAbilities_WorkspaceNotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_ExistsCheckDBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(errors.New("connection lost"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
// DB error returns 404 (same as "not found").
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- BroadcastEnabled ----------
|
||||
|
||||
func TestPatchAbilities_BroadcastEnabled_True(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, true).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["status"] != "updated" {
|
||||
t.Errorf("expected status 'updated', got %v", resp["status"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_BroadcastEnabled_False(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, false).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":false}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_BroadcastEnabled_UpdateFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, true).
|
||||
WillReturnError(errors.New("update failed"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- TalkToUserEnabled ----------
|
||||
|
||||
func TestPatchAbilities_TalkToUserEnabled_True(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, true).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"talk_to_user_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_TalkToUserEnabled_False(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, false).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"talk_to_user_enabled":false}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatchAbilities_TalkToUserEnabled_UpdateFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, false).
|
||||
WillReturnError(errors.New("connection lost"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"talk_to_user_enabled":false}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Both fields together ----------
|
||||
|
||||
func TestPatchAbilities_BothFields(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, true).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, false).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":true,"talk_to_user_enabled":false}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["status"] != "updated" {
|
||||
t.Errorf("expected status 'updated', got %v", resp["status"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- BroadcastEnabled with nil TalkToUserEnabled ----------
|
||||
|
||||
func TestPatchAbilities_BroadcastOnly_OmitsTalkToUser(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
wsID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
// Only broadcast_enabled update; talk_to_user_enabled must NOT be updated.
|
||||
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs(wsID, true).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest("PATCH", "/abilities", bytes.NewBufferString(`{"broadcast_enabled":true}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
PatchAbilities(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations — talk_to_user_enabled should not be updated: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package handlers
|
||||
|
||||
// workspace_broadcast.go — POST /workspaces/:id/broadcast
|
||||
//
|
||||
// Allows a workspace with broadcast_enabled=true to send a message to every
|
||||
// non-removed agent workspace in the org. The message is:
|
||||
//
|
||||
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
|
||||
// so poll-mode agents pick it up via GET /activity.
|
||||
// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can
|
||||
// show a real-time banner for each recipient workspace.
|
||||
//
|
||||
// The sender's own workspace logs a 'broadcast_sent' activity row for
|
||||
// traceability.
|
||||
//
|
||||
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
|
||||
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
|
||||
// TOCTOU — the middleware only proved the token is valid, not the ability.
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// BroadcastHandler is constructed once and shared across requests.
|
||||
type BroadcastHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
}
|
||||
|
||||
// NewBroadcastHandler creates a BroadcastHandler.
|
||||
func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
|
||||
return &BroadcastHandler{broadcaster: b}
|
||||
}
|
||||
|
||||
// Broadcast handles POST /workspaces/:id/broadcast.
|
||||
func (h *BroadcastHandler) Broadcast(c *gin.Context) {
|
||||
senderID := c.Param("id")
|
||||
if err := validateWorkspaceID(senderID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
|
||||
return
|
||||
}
|
||||
|
||||
var body struct {
|
||||
Message string `json:"message" binding:"required"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Verify sender exists and has broadcast_enabled=true.
|
||||
var senderName string
|
||||
var broadcastEnabled bool
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
|
||||
senderID,
|
||||
).Scan(&senderName, &broadcastEnabled)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
if !broadcastEnabled {
|
||||
c.JSON(http.StatusForbidden, gin.H{
|
||||
"error": "broadcast_disabled",
|
||||
"hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Collect all non-removed agent workspaces (excludes the sender itself).
|
||||
rows, err := db.DB.QueryContext(ctx,
|
||||
`SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`,
|
||||
senderID,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var recipientIDs []string
|
||||
for rows.Next() {
|
||||
var rid string
|
||||
if rows.Scan(&rid) == nil {
|
||||
recipientIDs = append(recipientIDs, rid)
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
|
||||
broadcastPayload := map[string]interface{}{
|
||||
"message": body.Message,
|
||||
"sender_id": senderID,
|
||||
"sender": senderName,
|
||||
}
|
||||
|
||||
// Persist broadcast_receive in each recipient's activity log + emit WS event.
|
||||
delivered := 0
|
||||
for _, rid := range recipientIDs {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
|
||||
VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')
|
||||
`, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil {
|
||||
log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err)
|
||||
continue
|
||||
}
|
||||
h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload)
|
||||
delivered++
|
||||
}
|
||||
|
||||
// Record the send on the sender's own log.
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
|
||||
VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')
|
||||
`, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil {
|
||||
log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "sent",
|
||||
"delivered": delivered,
|
||||
})
|
||||
}
|
||||
|
||||
func broadcastTruncate(s string, max int) string {
|
||||
runes := []rune(s)
|
||||
if len(runes) <= max {
|
||||
return s
|
||||
}
|
||||
return string(runes[:max]) + "…"
|
||||
}
|
||||
@@ -0,0 +1,442 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ---------- Validation ----------
|
||||
|
||||
func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
// No DB calls expected — validation fails first.
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/not-a-uuid/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["error"] != "invalid workspace ID" {
|
||||
t.Errorf("unexpected error: %v", resp["error"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_MissingMessage(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_EmptyMessage(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":""}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
// binding:"required" rejects empty string.
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Sender lookup ----------
|
||||
|
||||
func TestBroadcast_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_Disabled(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Workspace", false))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusForbidden {
|
||||
t.Errorf("expected 403, got %d", w.Code)
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["error"] != "broadcast_disabled" {
|
||||
t.Errorf("expected error 'broadcast_disabled', got %v", resp["error"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Recipient collection ----------
|
||||
|
||||
func TestBroadcast_EmptyRecipients_NoOtherWorkspaces(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
// Sender lookup succeeds.
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Solo Workspace", true))
|
||||
|
||||
// No other workspaces.
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE status != 'removed' AND id != \$1`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
// Only sender's own activity log.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["status"] != "sent" {
|
||||
t.Errorf("expected status 'sent', got %v", resp["status"])
|
||||
}
|
||||
if resp["delivered"].(float64) != 0 {
|
||||
t.Errorf("expected delivered=0, got %v", resp["delivered"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_Success_SingleRecipient(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
recipientID := "00000000-0000-0000-0000-000000000002"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Sender Workspace", true))
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE status != 'removed' AND id != \$1`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(recipientID))
|
||||
|
||||
// Recipient activity log.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(recipientID, senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Sender activity log.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello everyone"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["delivered"].(float64) != 1 {
|
||||
t.Errorf("expected delivered=1, got %v", resp["delivered"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_Success_MultipleRecipients(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
recipient1 := "00000000-0000-0000-0000-000000000002"
|
||||
recipient2 := "00000000-0000-0000-0000-000000000003"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Sender", true))
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE status != 'removed' AND id != \$1`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(recipient1).AddRow(recipient2))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(recipient1, senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(recipient2, senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello all"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["delivered"].(float64) != 2 {
|
||||
t.Errorf("expected delivered=2, got %v", resp["delivered"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Error handling ----------
|
||||
|
||||
func TestBroadcast_RecipientQueryError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Sender", true))
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE status != 'removed' AND id != \$1`).
|
||||
WithArgs(senderID).
|
||||
WillReturnError(errors.New("connection lost"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
// Returns 500 on recipient query failure (sender activity log not written either).
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_RecipientRowsError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Sender", true))
|
||||
|
||||
// Return rows with an error on rows.Err().
|
||||
rows := sqlmock.NewRows([]string{"id"}).AddRow("r1").RowError(0, errors.New("scan error"))
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE status != 'removed' AND id != \$1`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(rows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_ActivityLogInsertFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
recipientID := "00000000-0000-0000-0000-000000000002"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Sender", true))
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE status != 'removed' AND id != \$1`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(recipientID))
|
||||
|
||||
// Recipient activity log fails — handler logs and continues.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(recipientID, senderID, sqlmock.AnyArg()).
|
||||
WillReturnError(errors.New("insert failed"))
|
||||
|
||||
// Sender activity log succeeds.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(senderID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
c.Request = httptest.NewRequest("POST", "/broadcast", bytes.NewBufferString(`{"message":"hello"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
// Still returns 200 — recipient failures are logged and skipped.
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200 (fail-open), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
// delivered=0 because the only recipient's insert failed.
|
||||
if resp["delivered"].(float64) != 0 {
|
||||
t.Errorf("expected delivered=0, got %v", resp["delivered"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- broadcastTruncate ----------
|
||||
|
||||
func TestBroadcast_Truncate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
s string
|
||||
max int
|
||||
want string
|
||||
}{
|
||||
{"empty string", "", 10, ""},
|
||||
{"under limit", "hello", 10, "hello"},
|
||||
{"exact limit", "hello", 5, "hello"},
|
||||
{"over limit", "hello world", 5, "hello…"},
|
||||
{"unicode over limit", "こんにちは世界", 5, "こんにちは…"},
|
||||
{"exactly at limit", "abcde", 5, "abcde"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := broadcastTruncate(tc.s, tc.max)
|
||||
if got != tc.want {
|
||||
t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.s, tc.max, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,7 @@ var wsColumns = []string{
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
|
||||
// ==================== GET — financial fields stripped from open endpoint ====================
|
||||
@@ -52,8 +53,10 @@ func TestWorkspaceBudget_Get_NilLimit(t *testing.T) {
|
||||
[]byte(`{}`), "http://localhost:9001",
|
||||
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
|
||||
0.0, 0.0, false,
|
||||
nil, // budget_limit NULL
|
||||
0)) // monthly_spend 0
|
||||
nil, // budget_limit NULL
|
||||
0, // monthly_spend 0
|
||||
false, // broadcast_enabled
|
||||
true)) // talk_to_user_enabled
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -96,7 +99,8 @@ func TestWorkspaceBudget_Get_WithLimit(t *testing.T) {
|
||||
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
|
||||
0.0, 0.0, false,
|
||||
int64(500), // budget_limit = $5.00 in DB
|
||||
int64(123))) // monthly_spend = $1.23 in DB
|
||||
int64(123), // monthly_spend = $1.23 in DB
|
||||
false, true)) // broadcast_enabled, talk_to_user_enabled
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
@@ -805,6 +805,9 @@ func loadWorkspaceSecrets(ctx context.Context, workspaceID string) (map[string]s
|
||||
envVars[k] = string(decrypted)
|
||||
}
|
||||
}
|
||||
if err := globalRows.Err(); err != nil {
|
||||
log.Printf("Provisioner: global_secrets rows.Err workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
}
|
||||
wsRows, err := db.DB.QueryContext(ctx,
|
||||
`SELECT key, encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1`, workspaceID)
|
||||
@@ -823,6 +826,9 @@ func loadWorkspaceSecrets(ctx context.Context, workspaceID string) (map[string]s
|
||||
envVars[k] = string(decrypted)
|
||||
}
|
||||
}
|
||||
if err := wsRows.Err(); err != nil {
|
||||
log.Printf("Provisioner: workspace_secrets rows.Err workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
}
|
||||
return envVars, ""
|
||||
}
|
||||
|
||||
@@ -144,7 +144,6 @@ func TestProvisionWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
|
||||
rec := &trackingCPProv{startErr: errors.New("simulated CP rejection")}
|
||||
bcast := &concurrentSafeBroadcaster{}
|
||||
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, h)
|
||||
h.SetCPProvisioner(rec)
|
||||
|
||||
wsID := "ws-routes-to-cp-0123456789abcdef"
|
||||
@@ -596,7 +595,6 @@ func TestRestartWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
|
||||
|
||||
// Mock DB so cpStopWithRetry can run without a real Postgres.
|
||||
mock := setupTestDB(t)
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, h)
|
||||
mock.MatchExpectationsInOrder(false)
|
||||
// provisionWorkspaceCP runs in the goroutine and will hit secrets
|
||||
// SELECTs + UPDATE workspace as failed (we make CP Start return
|
||||
@@ -672,7 +670,6 @@ func TestRestartWorkspaceAuto_RoutesToDockerWhenOnlyDocker(t *testing.T) {
|
||||
|
||||
bcast := &concurrentSafeBroadcaster{}
|
||||
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, h)
|
||||
stub := &stoppingLocalProv{}
|
||||
h.provisioner = stub
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -635,11 +634,6 @@ func TestSeedInitialMemories_EmptyMemoriesNil(t *testing.T) {
|
||||
// ==================== buildProvisionerConfig ====================
|
||||
|
||||
func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
|
||||
WithArgs("ws-basic").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"workspace_dir", "workspace_access"}).AddRow("", "none"))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
tmpDir := t.TempDir()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", tmpDir)
|
||||
@@ -684,14 +678,6 @@ func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
|
||||
WithArgs("ws-env").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT digest FROM runtime_image_pins`).
|
||||
WithArgs("claude-code").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs("cccccccc-0001-0000-0000-000000000000").
|
||||
@@ -36,7 +37,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
|
||||
AddRow("cccccccc-0001-0000-0000-000000000000", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`),
|
||||
"http://localhost:8001", nil, 2, 1, 0.05, "", 3600, "working", "langgraph",
|
||||
"", 10.0, 20.0, false,
|
||||
nil, 0))
|
||||
nil, 0, false, true))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -118,6 +119,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs(id).
|
||||
@@ -125,7 +127,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
|
||||
AddRow(id, "Old Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
|
||||
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
|
||||
"", 0.0, 0.0, false,
|
||||
nil, 0))
|
||||
nil, 0, false, true))
|
||||
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"updated_at"}).AddRow(removedAt))
|
||||
@@ -181,6 +183,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs(id).
|
||||
@@ -188,7 +191,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
|
||||
AddRow(id, "Vanished", "worker", 1, string(models.StatusRemoved), []byte(`null`),
|
||||
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
|
||||
"", 0.0, 0.0, false,
|
||||
nil, 0))
|
||||
nil, 0, false, true))
|
||||
// Simulate the row vanishing between the two queries.
|
||||
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
@@ -243,6 +246,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs(id).
|
||||
@@ -250,7 +254,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
|
||||
AddRow(id, "Audit Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
|
||||
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
|
||||
"", 0.0, 0.0, false,
|
||||
nil, 0))
|
||||
nil, 0, false, true))
|
||||
// last_outbound_at follow-up query (existing path)
|
||||
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
@@ -676,6 +680,7 @@ func TestWorkspaceList_Empty(t *testing.T) {
|
||||
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@@ -1379,6 +1384,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
// Populate with non-zero financial values to confirm they are stripped.
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
@@ -1387,7 +1393,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
|
||||
AddRow("cccccccc-0010-0000-0000-000000000000", "Finance Test", "worker", 1, "online", []byte(`{}`),
|
||||
"http://localhost:9001", nil, 0, 1, 0.0, "", 0, "", "langgraph",
|
||||
"", 0.0, 0.0, false,
|
||||
int64(50000), int64(12500))) // budget_limit=500 USD, spend=125 USD
|
||||
int64(50000), int64(12500), false, true)) // budget_limit=500 USD, spend=125 USD
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -1435,6 +1441,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs("cccccccc-0955-0000-0000-000000000000").
|
||||
@@ -1447,7 +1454,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
|
||||
"langgraph",
|
||||
"/home/user/secret-projects/client-work",
|
||||
0.0, 0.0, false,
|
||||
nil, 0))
|
||||
nil, 0, false, true))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
@@ -36,6 +36,15 @@ type Workspace struct {
|
||||
// to activity_logs, agent reads via GET /activity?since_id=). See
|
||||
// migration 045 + RFC #2339.
|
||||
DeliveryMode string `json:"delivery_mode" db:"delivery_mode"`
|
||||
// BroadcastEnabled: when true the workspace may call POST /broadcast to
|
||||
// deliver a message to all non-removed agent workspaces in the org.
|
||||
// Default false — only privileged orchestrators should hold this ability.
|
||||
BroadcastEnabled bool `json:"broadcast_enabled" db:"broadcast_enabled"`
|
||||
// TalkToUserEnabled: when false the workspace's send_message_to_user calls
|
||||
// and POST /notify requests are rejected with HTTP 403 so the agent is
|
||||
// forced to route updates through a parent workspace. Default true
|
||||
// (preserves existing behaviour for all workspaces).
|
||||
TalkToUserEnabled bool `json:"talk_to_user_enabled" db:"talk_to_user_enabled"`
|
||||
// Canvas layout fields (from JOIN)
|
||||
X float64 `json:"x"`
|
||||
Y float64 `json:"y"`
|
||||
|
||||
@@ -4,12 +4,14 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -156,6 +158,11 @@ type cpProvisionRequest struct {
|
||||
Tier int `json:"tier"`
|
||||
PlatformURL string `json:"platform_url"`
|
||||
Env map[string]string `json:"env"`
|
||||
// ConfigFiles are template + generated config files to write into the
|
||||
// EC2 instance's /configs directory. OFFSEC-010: collected by
|
||||
// collectCPConfigFiles which rejects symlinks and non-regular files
|
||||
// before including them. Serialised as base64 to avoid JSON escaping.
|
||||
ConfigFiles map[string]string `json:"config_files,omitempty"`
|
||||
}
|
||||
|
||||
type cpProvisionResponse struct {
|
||||
@@ -179,6 +186,16 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
|
||||
}
|
||||
env["ADMIN_TOKEN"] = p.adminToken
|
||||
}
|
||||
// Collect template files and generated configs, with OFFSEC-010 guards:
|
||||
// - Rejects symlinks at the template root (prevents bypass via symlink traversal)
|
||||
// - Skips symlinks during WalkDir (prevents /etc/passwd etc. inclusion)
|
||||
// - Validates all paths are relative and non-escaping
|
||||
// - Caps total size at 12 KiB to prevent payload bloat
|
||||
configFiles, err := collectCPConfigFiles(cfg)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cp provisioner: collect config files: %w", err)
|
||||
}
|
||||
|
||||
req := cpProvisionRequest{
|
||||
OrgID: p.orgID,
|
||||
WorkspaceID: cfg.WorkspaceID,
|
||||
@@ -186,6 +203,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
|
||||
Tier: cfg.Tier,
|
||||
PlatformURL: cfg.PlatformURL,
|
||||
Env: env,
|
||||
ConfigFiles: configFiles,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(req)
|
||||
@@ -237,6 +255,94 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
|
||||
return result.InstanceID, nil
|
||||
}
|
||||
|
||||
const cpConfigFilesMaxBytes = 12 << 10
|
||||
|
||||
// isCPTemplateConfigFile restricts which files from a template directory are
|
||||
// eligible for transport to the control plane. Only config.yaml (the runtime
|
||||
// entrypoint config) and files under prompts/ (system prompts) are needed;
|
||||
// shipping arbitrary files (e.g. adapter.py, Dockerfile) is both unnecessary
|
||||
// and a potential data-exfiltration surface.
|
||||
func isCPTemplateConfigFile(name string) bool {
|
||||
name = filepath.ToSlash(filepath.Clean(name))
|
||||
return name == "config.yaml" || strings.HasPrefix(name, "prompts/")
|
||||
}
|
||||
|
||||
func collectCPConfigFiles(cfg WorkspaceConfig) (map[string]string, error) {
|
||||
files := make(map[string]string)
|
||||
total := 0
|
||||
addFile := func(name string, data []byte) error {
|
||||
name = filepath.ToSlash(filepath.Clean(name))
|
||||
if name == "." || strings.HasPrefix(name, "../") || strings.HasPrefix(name, "/") || strings.Contains(name, "/../") {
|
||||
return fmt.Errorf("invalid config file path %q", name)
|
||||
}
|
||||
total += len(data)
|
||||
if total > cpConfigFilesMaxBytes {
|
||||
return fmt.Errorf("config files exceed %d bytes", cpConfigFilesMaxBytes)
|
||||
}
|
||||
files[name] = base64.StdEncoding.EncodeToString(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
if cfg.TemplatePath != "" {
|
||||
// Reject symlinks on the root itself — WalkDir follows symlinks,
|
||||
// so a symlink TemplatePath that escapes the intended root directory
|
||||
// would bypass the subsequent path-relativization checks below.
|
||||
rootInfo, err := os.Lstat(cfg.TemplatePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("collectCPConfigFiles: lstat template path: %w", err)
|
||||
}
|
||||
if rootInfo.Mode()&os.ModeSymlink != 0 {
|
||||
return nil, fmt.Errorf("collectCPConfigFiles: template path must not be a symlink")
|
||||
}
|
||||
err = filepath.WalkDir(cfg.TemplatePath, func(path string, d os.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
// Skip symlinks — WalkDir follows them by default, which means
|
||||
// a symlink inside the template dir pointing to /etc/passwd
|
||||
// would be traversed even though the resulting relative-path
|
||||
// check would correctly reject it. Defense-in-depth: don't
|
||||
// follow symlinks at all. (OFFSEC-010)
|
||||
if d.Type()&os.ModeSymlink != 0 {
|
||||
return nil
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.Mode().IsRegular() {
|
||||
return nil
|
||||
}
|
||||
rel, err := filepath.Rel(cfg.TemplatePath, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !isCPTemplateConfigFile(rel) {
|
||||
return nil
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return addFile(rel, data)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for name, data := range cfg.ConfigFiles {
|
||||
if err := addFile(name, data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(files) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
// Stop terminates the workspace's EC2 instance via the control plane.
|
||||
//
|
||||
// Looks up the actual EC2 instance_id from the workspaces table before
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -279,6 +283,105 @@ func TestStart_TransportFailureSurfaces(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestStart_CollectsConfigFiles — verify that collectCPConfigFiles is called and
|
||||
// its result is included in the cpProvisionRequest sent to the control plane.
|
||||
// Tests the OFFSEC-010 wiring: the function's symlink guards are only effective
|
||||
// if the call site actually invokes it.
|
||||
func TestStart_CollectsConfigFiles(t *testing.T) {
|
||||
tmpl := t.TempDir()
|
||||
if err := os.WriteFile(filepath.Join(tmpl, "config.yaml"), []byte("name: test\n"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// adapter.py is within the size limit but is NOT config.yaml or prompts/,
|
||||
// so isCPTemplateConfigFile must exclude it from the transport.
|
||||
if err := os.WriteFile(filepath.Join(tmpl, "adapter.py"), bytes.Repeat([]byte("x"), cpConfigFilesMaxBytes), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var gotBody cpProvisionRequest
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_ = json.NewDecoder(r.Body).Decode(&gotBody)
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
_, _ = io.WriteString(w, `{"instance_id":"i-abc123","state":"pending"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
_, err := p.Start(context.Background(), WorkspaceConfig{
|
||||
WorkspaceID: "ws-1",
|
||||
Runtime: "python",
|
||||
Tier: 1,
|
||||
PlatformURL: "http://tenant",
|
||||
TemplatePath: tmpl,
|
||||
ConfigFiles: map[string][]byte{"generated.json": []byte(`{"key":"value"}`)},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Start: %v", err)
|
||||
}
|
||||
|
||||
// config.yaml from TemplatePath must be base64-encoded in ConfigFiles
|
||||
if len(gotBody.ConfigFiles) == 0 {
|
||||
t.Fatal("ConfigFiles is empty: collectCPConfigFiles was not called")
|
||||
}
|
||||
|
||||
// Find config.yaml entry and verify it's valid base64 + correct content
|
||||
var foundTemplate, foundGenerated bool
|
||||
for name, encoded := range gotBody.ConfigFiles {
|
||||
decoded, err := base64.StdEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
t.Errorf("ConfigFiles[%q] is not valid base64: %v", name, err)
|
||||
continue
|
||||
}
|
||||
if name == "config.yaml" && string(decoded) == "name: test\n" {
|
||||
foundTemplate = true
|
||||
}
|
||||
if name == "generated.json" && string(decoded) == `{"key":"value"}` {
|
||||
foundGenerated = true
|
||||
}
|
||||
}
|
||||
if !foundTemplate {
|
||||
t.Errorf("ConfigFiles missing config.yaml from TemplatePath")
|
||||
}
|
||||
if !foundGenerated {
|
||||
t.Errorf("ConfigFiles missing generated.json from ConfigFiles")
|
||||
}
|
||||
// adapter.py must NOT be in ConfigFiles — isCPTemplateConfigFile filters it out
|
||||
for name := range gotBody.ConfigFiles {
|
||||
if name == "adapter.py" {
|
||||
t.Errorf("adapter.py should not be in ConfigFiles — isCPTemplateConfigFile must filter it out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestStart_SymlinkTemplatePathError — a symlink TemplatePath should cause
|
||||
// collectCPConfigFiles to return an error, which Start must propagate.
|
||||
// Without this wiring, OFFSEC-010's root-symlink guard is dead code.
|
||||
func TestStart_SymlinkTemplatePathError(t *testing.T) {
|
||||
// Create a temp file and a symlink pointing to it
|
||||
tmp := t.TempDir()
|
||||
realFile := filepath.Join(tmp, "real")
|
||||
if err := os.WriteFile(realFile, []byte("data"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
symlink := filepath.Join(tmp, "template_link")
|
||||
if err := os.Symlink(realFile, symlink); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
p := &CPProvisioner{baseURL: "http://unused", orgID: "org-1", httpClient: &http.Client{Timeout: time.Second}}
|
||||
_, err := p.Start(context.Background(), WorkspaceConfig{
|
||||
WorkspaceID: "ws-1",
|
||||
Runtime: "python",
|
||||
TemplatePath: symlink, // symlink root → OFFSEC-010 guard should fire
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected error for symlink TemplatePath, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "symlink") {
|
||||
t.Errorf("error should mention symlink, got %q", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// TestStop_SendsBothAuthHeaders — verify #118/#130 compliance on the
|
||||
// teardown path. Any call to /cp/workspaces/:id must carry both the
|
||||
// platform-wide shared secret AND the per-tenant admin token, or the
|
||||
@@ -842,3 +945,67 @@ func TestIsRunning_EmptyInstanceIDReturnsFalse(t *testing.T) {
|
||||
t.Errorf("IsRunning with empty instance_id should return running=false, got true")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCollectCPConfigFiles_SkipsSymlinks — WalkDir follows symlinks by default,
|
||||
// but collectCPConfigFiles must skip them so a symlink inside a template dir
|
||||
// pointing outside (e.g. ln -s /etc snapshot) cannot be traversed.
|
||||
// Verifies OFFSEC-010 defense-in-depth fix. (OFFSEC-010)
|
||||
func TestCollectCPConfigFiles_SkipsSymlinks(t *testing.T) {
|
||||
tmpl := t.TempDir()
|
||||
// Write a real file that should be included.
|
||||
if err := os.WriteFile(filepath.Join(tmpl, "config.yaml"), []byte("name: real\n"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Create a subdir with a file that will be symlinked-outside.
|
||||
sensitiveDir := t.TempDir()
|
||||
if err := os.WriteFile(filepath.Join(sensitiveDir, "secret.txt"), []byte("SENSITIVE\n"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Symlink inside template dir pointing to outside path.
|
||||
symlinkPath := filepath.Join(tmpl, "snapshot")
|
||||
if err := os.Symlink(sensitiveDir, symlinkPath); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
files, err := collectCPConfigFiles(WorkspaceConfig{TemplatePath: tmpl})
|
||||
if err != nil {
|
||||
t.Fatalf("collectCPConfigFiles: %v", err)
|
||||
}
|
||||
if files == nil {
|
||||
t.Fatal("files should not be nil")
|
||||
}
|
||||
// config.yaml must be present.
|
||||
if _, ok := files["config.yaml"]; !ok {
|
||||
t.Errorf("config.yaml missing from files")
|
||||
}
|
||||
// The symlinked path must NOT be included (even though WalkDir would
|
||||
// traverse it, the d.Type()&os.ModeSymlink guard skips the entry).
|
||||
for k := range files {
|
||||
if strings.Contains(k, "snapshot") || strings.Contains(k, "secret") {
|
||||
t.Errorf("symlink path %q should not be in files — OFFSEC-010 regression", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCollectCPConfigFiles_RejectsRootSymlink — if cfg.TemplatePath itself is
|
||||
// a symlink, WalkDir would follow it to an arbitrary directory, bypassing the
|
||||
// cfg.TemplatePath boundary. The function must reject this case explicitly.
|
||||
// (OFFSEC-010)
|
||||
func TestCollectCPConfigFiles_RejectsRootSymlink(t *testing.T) {
|
||||
real := t.TempDir()
|
||||
if err := os.WriteFile(filepath.Join(real, "config.yaml"), []byte("name: real\n"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
link := filepath.Join(t.TempDir(), "template-link")
|
||||
if err := os.Symlink(real, link); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err := collectCPConfigFiles(WorkspaceConfig{TemplatePath: link})
|
||||
if err == nil {
|
||||
t.Error("collectCPConfigFiles with symlink TemplatePath should return error")
|
||||
}
|
||||
if err != nil && !strings.Contains(err.Error(), "symlink") {
|
||||
t.Errorf("expected symlink-related error, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,24 +62,6 @@ func TestValidateConfigSource_TemplateIsDirName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartSeedsConfigsBeforeContainerStart(t *testing.T) {
|
||||
src, err := os.ReadFile("provisioner.go")
|
||||
if err != nil {
|
||||
t.Fatalf("read provisioner.go: %v", err)
|
||||
}
|
||||
text := string(src)
|
||||
copyTemplate := strings.Index(text, "p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath)")
|
||||
writeFiles := strings.Index(text, "p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles)")
|
||||
start := strings.Index(text, "p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{})")
|
||||
|
||||
if copyTemplate < 0 || writeFiles < 0 || start < 0 {
|
||||
t.Fatalf("expected Start to copy template, write config files, and start container")
|
||||
}
|
||||
if copyTemplate >= start || writeFiles >= start {
|
||||
t.Fatalf("config seeding must happen before ContainerStart: copyTemplate=%d writeFiles=%d start=%d", copyTemplate, writeFiles, start)
|
||||
}
|
||||
}
|
||||
|
||||
// baseHostConfig returns a fresh HostConfig with typical pre-tier binds,
|
||||
// mimicking what Start() builds before calling ApplyTierConfig.
|
||||
func baseHostConfig(pluginsPath string) *container.HostConfig {
|
||||
|
||||
@@ -14,9 +14,8 @@ func setupMockDB(t *testing.T) sqlmock.Sqlmock {
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
|
||||
t.Cleanup(func() { mockDB.Close() })
|
||||
return mock
|
||||
}
|
||||
|
||||
|
||||
@@ -31,9 +31,8 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
|
||||
t.Cleanup(func() { mockDB.Close() })
|
||||
return mock
|
||||
}
|
||||
|
||||
|
||||
@@ -17,9 +17,8 @@ func setupHibernationMock(t *testing.T) sqlmock.Sqlmock {
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
|
||||
t.Cleanup(func() { mockDB.Close() })
|
||||
return mock
|
||||
}
|
||||
|
||||
|
||||
@@ -18,9 +18,8 @@ func setupLivenessTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
|
||||
t.Cleanup(func() { mockDB.Close() })
|
||||
return mock
|
||||
}
|
||||
|
||||
|
||||
@@ -146,6 +146,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAdmin.GET("/workspaces", wh.List)
|
||||
wsAdmin.POST("/workspaces", wh.Create)
|
||||
wsAdmin.DELETE("/workspaces/:id", wh.Delete)
|
||||
// Ability toggles — admin-only so workspace agents cannot self-modify
|
||||
// broadcast_enabled or talk_to_user_enabled.
|
||||
wsAdmin.PATCH("/workspaces/:id/abilities", handlers.PatchAbilities)
|
||||
// Out-of-band bootstrap signal: CP's watcher POSTs here when it
|
||||
// detects "RUNTIME CRASHED" in a workspace EC2 console output,
|
||||
// so the canvas flips to failed in seconds instead of waiting
|
||||
@@ -201,6 +204,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
|
||||
wsAuth.POST("/hibernate", wh.Hibernate)
|
||||
|
||||
// Broadcast — send a message to all non-removed workspaces in the org.
|
||||
// Requires broadcast_enabled=true on the source workspace (checked
|
||||
// inside the handler). WorkspaceAuth on wsAuth proves token ownership.
|
||||
broadcastH := handlers.NewBroadcastHandler(broadcaster)
|
||||
wsAuth.POST("/broadcast", broadcastH.Broadcast)
|
||||
|
||||
// External-workspace credential lifecycle (issue #319 follow-up to
|
||||
// the Create flow). Both endpoints reject runtime ≠ external with
|
||||
// 400 — see external_rotate.go for the rationale.
|
||||
|
||||
@@ -24,9 +24,8 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
|
||||
t.Cleanup(func() { mockDB.Close() })
|
||||
return mock
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
ALTER TABLE workspaces
|
||||
DROP COLUMN IF EXISTS broadcast_enabled,
|
||||
DROP COLUMN IF EXISTS talk_to_user_enabled;
|
||||
@@ -0,0 +1,16 @@
|
||||
-- Workspace abilities: opt-in flags that gate platform-level behaviours.
|
||||
--
|
||||
-- broadcast_enabled (default FALSE): when TRUE the workspace may call
|
||||
-- POST /workspaces/:id/broadcast to send a message to every non-removed
|
||||
-- agent workspace in the org. Off by default — only privileged
|
||||
-- orchestrator workspaces should hold this ability.
|
||||
--
|
||||
-- talk_to_user_enabled (default TRUE): when FALSE the workspace is not
|
||||
-- allowed to deliver messages to the canvas user via send_message_to_user /
|
||||
-- POST /notify. The platform returns HTTP 403 so the agent can forward its
|
||||
-- update to a parent workspace instead. Default TRUE preserves existing
|
||||
-- behaviour for all current workspaces.
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS broadcast_enabled BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
ADD COLUMN IF NOT EXISTS talk_to_user_enabled BOOLEAN NOT NULL DEFAULT TRUE;
|
||||
@@ -40,6 +40,8 @@ _A2A_BOUNDARY_END = "[/A2A_RESULT_FROM_PEER]"
|
||||
# inside the trusted zone. Escape BOTH boundary markers in the raw text
|
||||
# before wrapping so they can never close the boundary early.
|
||||
# We use "[/ " as the escape prefix — visually distinct from the real marker.
|
||||
_A2A_BOUNDARY_START_ESCAPED = "[/ A2A_RESULT_FROM_PEER]"
|
||||
_A2A_BOUNDARY_END_ESCAPED = "[/ /A2A_RESULT_FROM_PEER]"
|
||||
|
||||
|
||||
def _escape_boundary_markers(text: str) -> str:
|
||||
@@ -50,8 +52,8 @@ def _escape_boundary_markers(text: str) -> str:
|
||||
the boundary early or inject a fake opener.
|
||||
"""
|
||||
return (
|
||||
text.replace(_A2A_BOUNDARY_START, "[/ A2A_RESULT_FROM_PEER]")
|
||||
.replace(_A2A_BOUNDARY_END, "[/ /A2A_RESULT_FROM_PEER]")
|
||||
text.replace(_A2A_BOUNDARY_START, _A2A_BOUNDARY_START_ESCAPED)
|
||||
.replace(_A2A_BOUNDARY_END, _A2A_BOUNDARY_END_ESCAPED)
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from typing import Callable
|
||||
import inbox
|
||||
|
||||
from a2a_tools import (
|
||||
tool_broadcast_message,
|
||||
tool_chat_history,
|
||||
tool_check_task_status,
|
||||
tool_commit_memory,
|
||||
@@ -160,6 +161,11 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
||||
arguments.get("before_ts", ""),
|
||||
source_workspace_id=arguments.get("source_workspace_id") or None,
|
||||
)
|
||||
elif name == "broadcast_message":
|
||||
return await tool_broadcast_message(
|
||||
arguments.get("message", ""),
|
||||
workspace_id=arguments.get("workspace_id") or None,
|
||||
)
|
||||
return f"Unknown tool: {name}"
|
||||
|
||||
|
||||
@@ -686,8 +692,8 @@ def _format_channel_content(
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
|
||||
def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
"""Assert that stdio fds are pipe/socket/char-device compatible.
|
||||
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
"""Warn when stdio isn't a pipe — but continue anyway.
|
||||
|
||||
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
|
||||
rejected regular files, PTYs, and sockets with:
|
||||
@@ -711,10 +717,6 @@ def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> N
|
||||
)
|
||||
|
||||
|
||||
# Deprecated alias — the canonical name is _assert_stdio_is_pipe_compatible.
|
||||
_warn_if_stdio_not_pipe = _assert_stdio_is_pipe_compatible
|
||||
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
|
||||
|
||||
@@ -971,7 +973,7 @@ def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no
|
||||
if transport == "http":
|
||||
asyncio.run(_run_http_server(port))
|
||||
else:
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
_warn_if_stdio_not_pipe()
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
@@ -137,6 +137,7 @@ from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_cli
|
||||
# identically.
|
||||
from a2a_tools_messaging import ( # noqa: E402 (import after the top-of-module imports)
|
||||
_upload_chat_files,
|
||||
tool_broadcast_message,
|
||||
tool_chat_history,
|
||||
tool_get_workspace_info,
|
||||
tool_list_peers,
|
||||
|
||||
@@ -49,7 +49,9 @@ from a2a_client import (
|
||||
from a2a_tools_rbac import auth_headers_for_heartbeat as _auth_headers_for_heartbeat
|
||||
from _sanitize_a2a import (
|
||||
_A2A_BOUNDARY_END,
|
||||
_A2A_BOUNDARY_END_ESCAPED,
|
||||
_A2A_BOUNDARY_START,
|
||||
_A2A_BOUNDARY_START_ESCAPED,
|
||||
sanitize_a2a_result,
|
||||
) # noqa: E402
|
||||
|
||||
@@ -330,8 +332,18 @@ async def tool_delegate_task(
|
||||
# markers so the agent can distinguish trusted (own output) from untrusted
|
||||
# (peer-supplied) content. Explicit wrapping here rather than inside
|
||||
# sanitize_a2a_result preserves a clean separation of concerns.
|
||||
#
|
||||
# Truncate at the closer BEFORE sanitizing so the raw closer (which gets
|
||||
# lost during escaping) is removed from the content. After truncation,
|
||||
# sanitize the remaining text and wrap with escaped boundary markers.
|
||||
if _A2A_BOUNDARY_END in result:
|
||||
result = result[:result.index(_A2A_BOUNDARY_END)]
|
||||
escaped = sanitize_a2a_result(result)
|
||||
return f"{_A2A_BOUNDARY_START}\n{escaped}\n{_A2A_BOUNDARY_END}"
|
||||
return (
|
||||
f"{_A2A_BOUNDARY_START_ESCAPED}\n"
|
||||
f"{escaped}\n"
|
||||
f"{_A2A_BOUNDARY_END_ESCAPED}"
|
||||
)
|
||||
|
||||
|
||||
async def tool_delegate_task_async(
|
||||
|
||||
@@ -101,6 +101,50 @@ async def _upload_chat_files(
|
||||
return uploaded, None
|
||||
|
||||
|
||||
async def tool_broadcast_message(
|
||||
message: str,
|
||||
workspace_id: str | None = None,
|
||||
) -> str:
|
||||
"""Send a broadcast message to ALL agent workspaces in the org.
|
||||
|
||||
Requires the workspace to have broadcast_enabled=true (set by a user or
|
||||
admin via PATCH /workspaces/:id/abilities). Use for urgent org-wide
|
||||
signals — status changes, critical alerts, coordination instructions.
|
||||
Every non-removed workspace receives the message in its activity log so
|
||||
poll-mode agents pick it up, and push-mode canvases get a real-time
|
||||
BROADCAST_MESSAGE WebSocket event.
|
||||
|
||||
Args:
|
||||
message: The broadcast text. Keep it concise — all agents receive
|
||||
this, so avoid lengthy prose that floods every context.
|
||||
workspace_id: Optional. Which registered workspace to send the
|
||||
broadcast from. Single-workspace agents omit this.
|
||||
"""
|
||||
if not message:
|
||||
return "Error: message is required"
|
||||
target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.post(
|
||||
f"{PLATFORM_URL}/workspaces/{target_workspace_id}/broadcast",
|
||||
json={"message": message},
|
||||
headers=_auth_headers_for_heartbeat(target_workspace_id),
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
delivered = data.get("delivered", "?")
|
||||
return f"Broadcast sent to {delivered} workspace(s)"
|
||||
if resp.status_code == 403:
|
||||
try:
|
||||
hint = resp.json().get("hint", "")
|
||||
except Exception:
|
||||
hint = ""
|
||||
return f"Error: broadcast ability not enabled.{(' ' + hint) if hint else ''}"
|
||||
return f"Error: platform returned {resp.status_code}"
|
||||
except Exception as e:
|
||||
return f"Error sending broadcast: {e}"
|
||||
|
||||
|
||||
async def tool_send_message_to_user(
|
||||
message: str,
|
||||
attachments: list[str] | None = None,
|
||||
@@ -151,6 +195,20 @@ async def tool_send_message_to_user(
|
||||
if uploaded:
|
||||
return f"Message sent to user with {len(uploaded)} attachment(s)"
|
||||
return "Message sent to user"
|
||||
if resp.status_code == 403:
|
||||
try:
|
||||
body = resp.json()
|
||||
if body.get("error") == "talk_to_user_disabled":
|
||||
hint = body.get("hint", "")
|
||||
return (
|
||||
"Error: this workspace is not allowed to send messages "
|
||||
"directly to the user (talk_to_user is disabled). "
|
||||
+ (hint + " " if hint else "")
|
||||
+ "Use delegate_task to forward your update to a parent "
|
||||
"or supervisor workspace that can reach the user."
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return f"Error: platform returned {resp.status_code}"
|
||||
except Exception as e:
|
||||
return f"Error sending message: {e}"
|
||||
|
||||
@@ -3,9 +3,57 @@
|
||||
import logging
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider routing — type alias + resolver used by individual adapters.
|
||||
# Each adapter defines its own ProviderRegistry with the providers it accepts.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Maps prefix → (ordered_auth_env_vars, default_base_url).
|
||||
ProviderRegistry = dict[str, tuple[tuple[str, ...], str]]
|
||||
|
||||
|
||||
def resolve_provider_routing(
|
||||
model_str: str,
|
||||
env: Mapping[str, str],
|
||||
*,
|
||||
registry: ProviderRegistry,
|
||||
runtime_config: dict[str, Any] | None = None,
|
||||
) -> tuple[str, str, str]:
|
||||
"""Resolve a ``provider:model`` string to ``(api_key, base_url, bare_model_id)``.
|
||||
|
||||
URL precedence (highest to lowest):
|
||||
1. ``<PREFIX>_BASE_URL`` env var
|
||||
2. ``runtime_config["provider_url"]``
|
||||
3. registry default for the prefix
|
||||
|
||||
Unknown prefixes fall back to OPENAI_API_KEY + api.openai.com.
|
||||
Raises RuntimeError when no API key env var is set for the prefix.
|
||||
"""
|
||||
if ":" in model_str:
|
||||
prefix, model_id = model_str.split(":", 1)
|
||||
else:
|
||||
prefix, model_id = "openai", model_str
|
||||
|
||||
env_vars, default_url = registry.get(
|
||||
prefix, (("OPENAI_API_KEY",), "https://api.openai.com/v1")
|
||||
)
|
||||
api_key = next((env[v] for v in env_vars if env.get(v)), "")
|
||||
if not api_key:
|
||||
raise RuntimeError(
|
||||
f"No API key found for provider {prefix!r} "
|
||||
f"(checked: {', '.join(env_vars)}). Set one in workspace secrets."
|
||||
)
|
||||
|
||||
env_url = env.get(f"{prefix.upper()}_BASE_URL", "")
|
||||
config_url = (runtime_config or {}).get("provider_url", "")
|
||||
base_url = env_url or config_url or default_url
|
||||
|
||||
return api_key, base_url, model_id
|
||||
|
||||
from a2a.server.agent_execution import AgentExecutor
|
||||
|
||||
from event_log import DisabledEventLog, EventLogBackend
|
||||
|
||||
@@ -340,6 +340,10 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
|
||||
"delegate_task_async": "delegate --async",
|
||||
"check_task_status": "status",
|
||||
"get_workspace_info": "info",
|
||||
# `broadcast_message` is not exposed via the CLI subprocess interface
|
||||
# today — it's an MCP-first capability. If a2a_cli grows a `broadcast`
|
||||
# subcommand, map it here and the alignment test will gate the change.
|
||||
"broadcast_message": None,
|
||||
# `send_message_to_user` is not exposed via the CLI subprocess
|
||||
# interface today — it requires a structured `attachments` field
|
||||
# that wouldn't survive a positional-arg shell invocation cleanly.
|
||||
|
||||
@@ -51,6 +51,7 @@ from dataclasses import dataclass
|
||||
from typing import Any, Literal
|
||||
|
||||
from a2a_tools import (
|
||||
tool_broadcast_message,
|
||||
tool_chat_history,
|
||||
tool_check_task_status,
|
||||
tool_commit_memory,
|
||||
@@ -288,6 +289,44 @@ _GET_WORKSPACE_INFO = ToolSpec(
|
||||
section=A2A_SECTION,
|
||||
)
|
||||
|
||||
_BROADCAST_MESSAGE = ToolSpec(
|
||||
name="broadcast_message",
|
||||
short=(
|
||||
"Send a message to ALL agent workspaces in the org simultaneously. "
|
||||
"Requires broadcast_enabled=true on this workspace (set by user/admin)."
|
||||
),
|
||||
when_to_use=(
|
||||
"Use for urgent, org-wide signals: critical status changes, emergency "
|
||||
"stop instructions, coordinated task announcements. Every non-removed "
|
||||
"workspace receives the message in its activity log (poll-mode agents "
|
||||
"see it on their next poll; push-mode canvases get a real-time banner). "
|
||||
"This tool returns an error if broadcast_enabled is false — a user or "
|
||||
"admin must enable it via the workspace abilities settings first."
|
||||
),
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"message": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The broadcast text. Keep it concise — every agent in the "
|
||||
"org receives this in their activity feed."
|
||||
),
|
||||
},
|
||||
"workspace_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Optional. Multi-workspace mode: the registered workspace "
|
||||
"to broadcast from. Single-workspace agents omit this."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": ["message"],
|
||||
},
|
||||
impl=tool_broadcast_message,
|
||||
section=A2A_SECTION,
|
||||
)
|
||||
|
||||
_SEND_MESSAGE_TO_USER = ToolSpec(
|
||||
name="send_message_to_user",
|
||||
short=(
|
||||
@@ -603,6 +642,7 @@ TOOLS: list[ToolSpec] = [
|
||||
_CHECK_TASK_STATUS,
|
||||
_LIST_PEERS,
|
||||
_GET_WORKSPACE_INFO,
|
||||
_BROADCAST_MESSAGE,
|
||||
_SEND_MESSAGE_TO_USER,
|
||||
# Inbox (standalone-only; in-container returns informational error)
|
||||
_WAIT_FOR_MESSAGE,
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
- **check_task_status**: Poll the status of a task started with delegate_task_async; returns result when done.
|
||||
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
|
||||
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
|
||||
- **broadcast_message**: Send a message to ALL agent workspaces in the org simultaneously. Requires broadcast_enabled=true on this workspace (set by user/admin).
|
||||
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
|
||||
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
|
||||
- **inbox_peek**: List pending inbound messages without removing them.
|
||||
@@ -26,6 +27,9 @@ Call this first when you need to delegate but don't know the target's ID. Access
|
||||
### get_workspace_info
|
||||
Use to introspect your own identity (e.g. before reporting back to the user, or to determine whether you're a tier-0 root that can write GLOBAL memory).
|
||||
|
||||
### broadcast_message
|
||||
Use for urgent, org-wide signals: critical status changes, emergency stop instructions, coordinated task announcements. Every non-removed workspace receives the message in its activity log (poll-mode agents see it on their next poll; push-mode canvases get a real-time banner). This tool returns an error if broadcast_enabled is false — a user or admin must enable it via the workspace abilities settings first.
|
||||
|
||||
### send_message_to_user
|
||||
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).
|
||||
|
||||
|
||||
@@ -1826,8 +1826,8 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
|
||||
|
||||
|
||||
class TestStdioPipeAssertion:
|
||||
"""Pin _assert_stdio_is_pipe_compatible — the canonical function name.
|
||||
_warn_if_stdio_not_pipe is a deprecated alias.
|
||||
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
|
||||
the old fatal _assert_stdio_is_pipe_compatible guard.
|
||||
|
||||
The universal stdio transport now works with ANY file descriptor
|
||||
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
|
||||
@@ -1838,12 +1838,12 @@ class TestStdioPipeAssertion:
|
||||
|
||||
def test_pipe_pair_passes_silently(self, caplog):
|
||||
"""Happy path — both fds are pipes. No warning emitted."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
assert "not a pipe" not in caplog.text
|
||||
finally:
|
||||
os.close(r)
|
||||
@@ -1852,14 +1852,14 @@ class TestStdioPipeAssertion:
|
||||
def test_regular_file_stdout_warns(self, tmp_path, caplog):
|
||||
"""Reproducer for runtime#61: stdout redirected to a regular file.
|
||||
Now emits a warning instead of exiting."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
r, _w = os.pipe()
|
||||
regular = tmp_path / "captured.log"
|
||||
f = open(regular, "wb")
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=f.fileno())
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
|
||||
assert "stdout" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
finally:
|
||||
@@ -1868,7 +1868,7 @@ class TestStdioPipeAssertion:
|
||||
|
||||
def test_regular_file_stdin_warns(self, tmp_path, caplog):
|
||||
"""Symmetric case — stdin redirected from a regular file."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
regular = tmp_path / "input.json"
|
||||
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
|
||||
@@ -1876,7 +1876,7 @@ class TestStdioPipeAssertion:
|
||||
_r, w = os.pipe()
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=f.fileno(), stdout_fd=w)
|
||||
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
|
||||
assert "stdin" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
finally:
|
||||
@@ -1886,13 +1886,13 @@ class TestStdioPipeAssertion:
|
||||
def test_closed_fd_warns_about_stat_error(self, caplog):
|
||||
"""If stdio is closed, os.fstat raises OSError. Warning is
|
||||
skipped silently (can't stat the fd)."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
r, w = os.pipe()
|
||||
os.close(w) # Now `w` is a stale fd — fstat will fail.
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
# No warning emitted because fstat failed before the check
|
||||
assert "not a pipe" not in caplog.text
|
||||
finally:
|
||||
|
||||
@@ -218,7 +218,8 @@ class TestPollingPathSanitization:
|
||||
result = asyncio.run(d.tool_delegate_task("ws-peer", "do it"))
|
||||
# tool_delegate_task wraps the sanitized text in _A2A_BOUNDARY_START/END
|
||||
# (NOT _A2A_RESULT_FROM_PEER — that marker is for the messaging path).
|
||||
assert d._A2A_BOUNDARY_START in result
|
||||
assert d._A2A_BOUNDARY_END in result
|
||||
# Wrapped in escaped form to prevent raw closer from appearing in output.
|
||||
assert d._A2A_BOUNDARY_START_ESCAPED in result
|
||||
assert d._A2A_BOUNDARY_END_ESCAPED in result
|
||||
assert "Sanitized peer reply" in result
|
||||
|
||||
|
||||
@@ -277,7 +277,7 @@ class TestToolDelegateTask:
|
||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||
result = await a2a_tools.tool_delegate_task("ws-1", "do something")
|
||||
|
||||
assert result == "[A2A_RESULT_FROM_PEER]\nTask completed!\n[/A2A_RESULT_FROM_PEER]"
|
||||
assert result == "[/ A2A_RESULT_FROM_PEER]\nTask completed!\n[/ /A2A_RESULT_FROM_PEER]"
|
||||
|
||||
async def test_error_response_returns_delegation_failed_message(self):
|
||||
"""When send_a2a_message returns _A2A_ERROR_PREFIX text, delegation fails."""
|
||||
@@ -305,7 +305,7 @@ class TestToolDelegateTask:
|
||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||
result = await a2a_tools.tool_delegate_task("ws-cached", "task")
|
||||
|
||||
assert result == "[A2A_RESULT_FROM_PEER]\ndone\n[/A2A_RESULT_FROM_PEER]"
|
||||
assert result == "[/ A2A_RESULT_FROM_PEER]\ndone\n[/ /A2A_RESULT_FROM_PEER]"
|
||||
|
||||
async def test_peer_name_falls_back_to_id_prefix(self):
|
||||
"""When peer has no name and cache is empty, name = first 8 chars of workspace_id."""
|
||||
@@ -319,7 +319,7 @@ class TestToolDelegateTask:
|
||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||
result = await a2a_tools.tool_delegate_task("ws-nona000", "task")
|
||||
|
||||
assert result == "[A2A_RESULT_FROM_PEER]\nok\n[/A2A_RESULT_FROM_PEER]"
|
||||
assert result == "[/ A2A_RESULT_FROM_PEER]\nok\n[/ /A2A_RESULT_FROM_PEER]"
|
||||
# Cache should now have been set
|
||||
assert a2a_tools._peer_names.get("ws-nona000") is not None
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ class TestFlagOffLegacyPath:
|
||||
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||
|
||||
import a2a_tools
|
||||
from _sanitize_a2a import _A2A_BOUNDARY_END, _A2A_BOUNDARY_START
|
||||
from _sanitize_a2a import _A2A_BOUNDARY_END_ESCAPED, _A2A_BOUNDARY_START_ESCAPED
|
||||
send_calls = []
|
||||
|
||||
async def fake_send(workspace_id, task, source_workspace_id=None):
|
||||
@@ -91,8 +91,8 @@ class TestFlagOffLegacyPath:
|
||||
)
|
||||
|
||||
# OFFSEC-003: result is wrapped in boundary markers
|
||||
assert _A2A_BOUNDARY_START in result
|
||||
assert _A2A_BOUNDARY_END in result
|
||||
assert _A2A_BOUNDARY_START_ESCAPED in result
|
||||
assert _A2A_BOUNDARY_END_ESCAPED in result
|
||||
assert "legacy ok" in result
|
||||
assert send_calls == [("ws-target", "task body", "ws-self")]
|
||||
poll_mock.assert_not_called()
|
||||
@@ -124,7 +124,7 @@ class TestPollModeAutoFallback:
|
||||
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||
|
||||
import a2a_tools
|
||||
from _sanitize_a2a import _A2A_BOUNDARY_END, _A2A_BOUNDARY_START
|
||||
from _sanitize_a2a import _A2A_BOUNDARY_END_ESCAPED, _A2A_BOUNDARY_START_ESCAPED
|
||||
from a2a_client import _A2A_QUEUED_PREFIX
|
||||
|
||||
send_calls = []
|
||||
@@ -159,8 +159,8 @@ class TestPollModeAutoFallback:
|
||||
assert poll_calls[0] == ("ws-target", "task body", "ws-self")
|
||||
# Caller sees the real reply, NOT the queued sentinel and NOT
|
||||
# a DELEGATION FAILED string. Wrapped in OFFSEC-003 boundary markers.
|
||||
assert _A2A_BOUNDARY_START in result
|
||||
assert _A2A_BOUNDARY_END in result
|
||||
assert _A2A_BOUNDARY_START_ESCAPED in result
|
||||
assert _A2A_BOUNDARY_END_ESCAPED in result
|
||||
assert "real response from poll-mode peer" in result
|
||||
|
||||
async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch):
|
||||
@@ -169,7 +169,7 @@ class TestPollModeAutoFallback:
|
||||
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||
|
||||
import a2a_tools
|
||||
from _sanitize_a2a import _A2A_BOUNDARY_END, _A2A_BOUNDARY_START
|
||||
from _sanitize_a2a import _A2A_BOUNDARY_END_ESCAPED, _A2A_BOUNDARY_START_ESCAPED
|
||||
|
||||
async def fake_send(*_a, **_kw):
|
||||
return "normal reply"
|
||||
@@ -189,8 +189,8 @@ class TestPollModeAutoFallback:
|
||||
)
|
||||
|
||||
# OFFSEC-003: wrapped in boundary markers
|
||||
assert _A2A_BOUNDARY_START in result
|
||||
assert _A2A_BOUNDARY_END in result
|
||||
assert _A2A_BOUNDARY_START_ESCAPED in result
|
||||
assert _A2A_BOUNDARY_END_ESCAPED in result
|
||||
assert "normal reply" in result
|
||||
poll_mock.assert_not_called()
|
||||
|
||||
|
||||
@@ -1,153 +1,141 @@
|
||||
"""Unit tests for OpenClaw adapter env-var key selection and provider URL routing.
|
||||
"""Unit tests for resolve_provider_routing in adapter_base.
|
||||
|
||||
The key-selection and URL-routing logic lives inline in OpenClawAdapter.setup()
|
||||
(adapter.py lines 84-92). Since setup() carries heavy subprocess dependencies,
|
||||
these tests isolate the selection logic by reproducing the exact Python expressions
|
||||
from the adapter source — if the adapter's logic changes, these tests must be kept
|
||||
in sync.
|
||||
|
||||
Organisation:
|
||||
TestEnvKeyChain — priority order of the 3 currently supported keys
|
||||
TestProviderUrlMapping — model-prefix → provider URL dict correctness
|
||||
TestNegativeAndFallback — no keys set / unsupported keys
|
||||
xfail stubs — AISTUDIO + QIANFAN documented as not-yet-implemented
|
||||
Covers provider routing, URL-override precedence, and the missing-key error path.
|
||||
Each adapter defines its own registry; this test file defines one inline that
|
||||
mirrors what the openclaw adapter uses.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from adapter_base import ProviderRegistry, resolve_provider_routing
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers — mirror the exact expressions from adapter.py lines 84-92.
|
||||
# Must be kept in sync with the adapter source.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _select_key(env: dict) -> str:
|
||||
"""Mirror line 84: nested os.environ.get priority chain."""
|
||||
return env.get("OPENAI_API_KEY",
|
||||
env.get("GROQ_API_KEY",
|
||||
env.get("OPENROUTER_API_KEY", "")))
|
||||
|
||||
|
||||
_PROVIDER_URLS: dict[str, str] = {
|
||||
"openai": "https://api.openai.com/v1",
|
||||
"groq": "https://api.groq.com/openai/v1",
|
||||
"openrouter": "https://openrouter.ai/api/v1",
|
||||
# Mirror of the registry in openclaw's adapter.py — kept in sync manually.
|
||||
PROVIDER_REGISTRY: ProviderRegistry = {
|
||||
"openai": (("OPENAI_API_KEY",), "https://api.openai.com/v1"),
|
||||
"groq": (("GROQ_API_KEY",), "https://api.groq.com/openai/v1"),
|
||||
"openrouter": (("OPENROUTER_API_KEY",), "https://openrouter.ai/api/v1"),
|
||||
"qianfan": (("QIANFAN_API_KEY", "AISTUDIO_API_KEY"), "https://qianfan.baidubce.com/v2"),
|
||||
"minimax": (("MINIMAX_API_KEY",), "https://api.minimaxi.com/v1"),
|
||||
"moonshot": (("KIMI_API_KEY",), "https://api.moonshot.ai/v1"),
|
||||
}
|
||||
|
||||
|
||||
def _select_url(model: str, runtime_config: dict | None = None) -> str:
|
||||
"""Mirror lines 86-92: model-prefix → provider URL with optional override."""
|
||||
prefix = model.split(":")[0] if ":" in model else "openai"
|
||||
return (runtime_config or {}).get(
|
||||
"provider_url",
|
||||
_PROVIDER_URLS.get(prefix, "https://api.openai.com/v1"),
|
||||
)
|
||||
class TestProviderRouting:
|
||||
|
||||
def test_openai_key_and_url(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"openai:gpt-4o", {"OPENAI_API_KEY": "sk-openai"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-openai"
|
||||
assert base_url == "https://api.openai.com/v1"
|
||||
assert model_id == "gpt-4o"
|
||||
|
||||
def test_groq_key_and_url(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"groq:llama-3.3-70b", {"GROQ_API_KEY": "sk-groq"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-groq"
|
||||
assert base_url == "https://api.groq.com/openai/v1"
|
||||
assert model_id == "llama-3.3-70b"
|
||||
|
||||
def test_openrouter_key_and_url(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"openrouter:anthropic/claude-sonnet-4-5", {"OPENROUTER_API_KEY": "sk-or"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-or"
|
||||
assert base_url == "https://openrouter.ai/api/v1"
|
||||
assert model_id == "anthropic/claude-sonnet-4-5"
|
||||
|
||||
def test_qianfan_primary_key(self):
|
||||
api_key, _, _ = resolve_provider_routing(
|
||||
"qianfan:ernie-4.5", {"QIANFAN_API_KEY": "sk-qf", "AISTUDIO_API_KEY": "sk-ai"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-qf"
|
||||
|
||||
def test_qianfan_fallback_to_aistudio(self):
|
||||
api_key, base_url, _ = resolve_provider_routing(
|
||||
"qianfan:ernie-4.5", {"AISTUDIO_API_KEY": "sk-ai"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-ai"
|
||||
assert base_url == "https://qianfan.baidubce.com/v2"
|
||||
|
||||
def test_minimax_key_and_url(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"minimax:MiniMax-M2.7", {"MINIMAX_API_KEY": "sk-mm"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-mm"
|
||||
assert base_url == "https://api.minimaxi.com/v1"
|
||||
assert model_id == "MiniMax-M2.7"
|
||||
|
||||
def test_moonshot_key_and_url(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"moonshot:kimi-k2.5", {"KIMI_API_KEY": "sk-kimi"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert api_key == "sk-kimi"
|
||||
assert base_url == "https://api.moonshot.ai/v1"
|
||||
assert model_id == "kimi-k2.5"
|
||||
|
||||
def test_bare_model_id_defaults_to_openai(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"gpt-4o", {"OPENAI_API_KEY": "sk-openai"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert base_url == "https://api.openai.com/v1"
|
||||
assert model_id == "gpt-4o"
|
||||
|
||||
def test_unknown_prefix_falls_back_to_openai_url(self):
|
||||
api_key, base_url, model_id = resolve_provider_routing(
|
||||
"custom-shim:my-model", {"OPENAI_API_KEY": "sk-openai"}, registry=PROVIDER_REGISTRY
|
||||
)
|
||||
assert base_url == "https://api.openai.com/v1"
|
||||
assert model_id == "my-model"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Env-var key priority chain (3 keys currently in adapter.py)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestUrlOverridePrecedence:
|
||||
|
||||
class TestEnvKeyChain:
|
||||
def test_env_base_url_beats_registry_default(self):
|
||||
_, base_url, _ = resolve_provider_routing(
|
||||
"minimax:MiniMax-M2.7",
|
||||
{"MINIMAX_API_KEY": "sk-mm", "MINIMAX_BASE_URL": "https://api.minimax.chat/v1"},
|
||||
registry=PROVIDER_REGISTRY,
|
||||
)
|
||||
assert base_url == "https://api.minimax.chat/v1"
|
||||
|
||||
def test_openai_key_selected(self):
|
||||
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai-test"}, clear=True):
|
||||
assert _select_key(os.environ) == "sk-openai-test"
|
||||
def test_runtime_config_provider_url_beats_registry_default(self):
|
||||
_, base_url, _ = resolve_provider_routing(
|
||||
"openai:gpt-4o",
|
||||
{"OPENAI_API_KEY": "sk-openai"},
|
||||
registry=PROVIDER_REGISTRY,
|
||||
runtime_config={"provider_url": "https://proxy.example.com/v1"},
|
||||
)
|
||||
assert base_url == "https://proxy.example.com/v1"
|
||||
|
||||
def test_groq_key_selected_when_openai_absent(self):
|
||||
with patch.dict(os.environ, {"GROQ_API_KEY": "sk-groq-test"}, clear=True):
|
||||
assert _select_key(os.environ) == "sk-groq-test"
|
||||
|
||||
def test_openrouter_key_selected_when_openai_and_groq_absent(self):
|
||||
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "sk-or-test"}, clear=True):
|
||||
assert _select_key(os.environ) == "sk-or-test"
|
||||
|
||||
def test_openai_beats_groq_when_both_set(self):
|
||||
with patch.dict(os.environ, {"OPENAI_API_KEY": "openai", "GROQ_API_KEY": "groq"}, clear=True):
|
||||
assert _select_key(os.environ) == "openai"
|
||||
|
||||
def test_groq_beats_openrouter_when_openai_absent(self):
|
||||
with patch.dict(os.environ, {"GROQ_API_KEY": "groq", "OPENROUTER_API_KEY": "or"}, clear=True):
|
||||
assert _select_key(os.environ) == "groq"
|
||||
def test_env_base_url_beats_runtime_config(self):
|
||||
_, base_url, _ = resolve_provider_routing(
|
||||
"openai:gpt-4o",
|
||||
{"OPENAI_API_KEY": "sk-openai", "OPENAI_BASE_URL": "https://env-wins.com/v1"},
|
||||
registry=PROVIDER_REGISTRY,
|
||||
runtime_config={"provider_url": "https://config-loses.com/v1"},
|
||||
)
|
||||
assert base_url == "https://env-wins.com/v1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Model-prefix → provider URL routing
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestMissingKey:
|
||||
|
||||
class TestProviderUrlMapping:
|
||||
def test_raises_when_no_key_set(self):
|
||||
with pytest.raises(RuntimeError, match="No API key found for provider 'minimax'"):
|
||||
resolve_provider_routing("minimax:MiniMax-M2.7", {}, registry=PROVIDER_REGISTRY)
|
||||
|
||||
def test_openai_prefix_routes_to_openai(self):
|
||||
assert _select_url("openai:gpt-4o") == "https://api.openai.com/v1"
|
||||
|
||||
def test_groq_prefix_routes_to_groq(self):
|
||||
assert _select_url("groq:llama3-70b") == "https://api.groq.com/openai/v1"
|
||||
|
||||
def test_openrouter_prefix_routes_to_openrouter(self):
|
||||
assert _select_url("openrouter:meta-llama/llama-3.3-70b") == "https://openrouter.ai/api/v1"
|
||||
|
||||
def test_runtime_config_override_wins_over_prefix(self):
|
||||
url = _select_url("openai:gpt-4o", {"provider_url": "https://custom.example.com/v1"})
|
||||
assert url == "https://custom.example.com/v1"
|
||||
|
||||
def test_unknown_prefix_falls_back_to_openai(self):
|
||||
assert _select_url("some-unknown-model") == "https://api.openai.com/v1"
|
||||
def test_raises_lists_checked_vars_in_message(self):
|
||||
with pytest.raises(RuntimeError, match="MINIMAX_API_KEY"):
|
||||
resolve_provider_routing("minimax:MiniMax-M2.7", {}, registry=PROVIDER_REGISTRY)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. Negative / fallback cases
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestRegistryCompleteness:
|
||||
"""Smoke-check that every provider in the registry has a non-empty entry."""
|
||||
|
||||
class TestNegativeAndFallback:
|
||||
|
||||
def test_no_keys_returns_empty_string(self):
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
assert _select_key(os.environ) == ""
|
||||
|
||||
def test_unsupported_aistudio_key_returns_empty(self):
|
||||
"""Documents that AISTUDIO_API_KEY is NOT yet in the adapter's key chain."""
|
||||
with patch.dict(os.environ, {"AISTUDIO_API_KEY": "sk-ai"}, clear=True):
|
||||
assert _select_key(os.environ) == ""
|
||||
|
||||
def test_unsupported_qianfan_key_returns_empty(self):
|
||||
"""Documents that QIANFAN_API_KEY is NOT yet in the adapter's key chain."""
|
||||
with patch.dict(os.environ, {"QIANFAN_API_KEY": "sk-qf"}, clear=True):
|
||||
assert _select_key(os.environ) == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. AISTUDIO + QIANFAN — xfail stubs (not yet implemented in adapter.py)
|
||||
# These fail now; they should be promoted to passing tests once the adapter
|
||||
# adds AISTUDIO_API_KEY and QIANFAN_API_KEY to its key chain and provider_urls.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason=(
|
||||
"AISTUDIO_API_KEY not yet in openclaw adapter env-var chain — "
|
||||
"add to adapter.py line 84 and provider_urls dict with "
|
||||
"URL https://generativelanguage.googleapis.com/v1beta/openai"
|
||||
),
|
||||
)
|
||||
def test_aistudio_key_routes_to_aistudio_url():
|
||||
with patch.dict(os.environ, {"AISTUDIO_API_KEY": "sk-ai-test"}, clear=True):
|
||||
assert _select_key(os.environ) == "sk-ai-test"
|
||||
assert _select_url("gemini-2.5-flash") == "https://generativelanguage.googleapis.com/v1beta/openai"
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason=(
|
||||
"QIANFAN_API_KEY not yet in openclaw adapter env-var chain — "
|
||||
"add to adapter.py line 84 and provider_urls dict with "
|
||||
"URL https://qianfan.baidubce.com/v2"
|
||||
),
|
||||
)
|
||||
def test_qianfan_key_routes_to_qianfan_url():
|
||||
with patch.dict(os.environ, {"QIANFAN_API_KEY": "sk-qf-test"}, clear=True):
|
||||
assert _select_key(os.environ) == "sk-qf-test"
|
||||
assert _select_url("ernie-4.5") == "https://qianfan.baidubce.com/v2"
|
||||
@pytest.mark.parametrize("prefix", PROVIDER_REGISTRY)
|
||||
def test_all_providers_have_key_vars_and_url(self, prefix):
|
||||
env_vars, base_url = PROVIDER_REGISTRY[prefix]
|
||||
assert env_vars, f"{prefix}: env_vars is empty"
|
||||
assert base_url.startswith("https://"), f"{prefix}: base_url looks wrong: {base_url}"
|
||||
|
||||
Reference in New Issue
Block a user