Compare commits
65 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| acde1eb676 | |||
| f4b4036a68 | |||
| b0f66735c4 | |||
| 69bec10321 | |||
| 4e84dffd9e | |||
| fed6352b58 | |||
| cace2eb7d3 | |||
| 231fb5ddab | |||
| 01087ddbe7 | |||
| 3112f394eb | |||
| 7fb0da3ed5 | |||
| 805486e36e | |||
| bad6699320 | |||
| 8c3234e4d2 | |||
| 741bb11059 | |||
| 3a82e1f1b1 | |||
| f7183cc0d8 | |||
| 0253cdeb47 | |||
| 65f4ffb0ac | |||
| 6f98ac062e | |||
| 992ccfbd5e | |||
| 086b479dca | |||
| 51284546d2 | |||
| 9b36c9eb7a | |||
| adaaa2a1f8 | |||
| 37739e3dd8 | |||
| 1c76713d71 | |||
| e92468db13 | |||
| be8424c350 | |||
| a7caaa6bd0 | |||
| 3e28bf5943 | |||
| a356bc94f3 | |||
| 9981a5099a | |||
| 07d3dcd988 | |||
| 3ff613e3ad | |||
| 96c37cb098 | |||
| e123d07898 | |||
| 22fbf43580 | |||
| a47307969c | |||
| ff2557d899 | |||
| 119743d0de | |||
| c3806cd890 | |||
| 55e8c2d347 | |||
| 07b465f13d | |||
| eb21a02b6d | |||
| 498ce4e287 | |||
| 7081a8e900 | |||
| da4b86a159 | |||
| 81d864f4bc | |||
| c9795a6c4d | |||
| f5dc55f1d1 | |||
| fd92df486c | |||
| fc7498fef0 | |||
| 51dcca592d | |||
| 27c1e18e98 | |||
| 73502db9f4 | |||
| 4f85ef5209 | |||
| def18f28fa | |||
| 8fc27f4d69 | |||
| 6137657704 | |||
| 704a8ab7de | |||
| 7f59b7fd35 | |||
| 660fc20124 | |||
| 07457ad556 | |||
| e9c4f23ae2 |
@@ -61,6 +61,7 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
@@ -89,6 +90,19 @@ API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else ""
|
||||
# match by exact title without parsing.
|
||||
TITLE_PREFIX = "[main-red]"
|
||||
|
||||
# Settling window (seconds) between initial red detection and the
|
||||
# pre-file recheck. The recheck filters out the two largest false-
|
||||
# positive classes seen in mc#1597..1630 (task #394, 2026-05-21):
|
||||
# 1. HEAD moved on (a new commit landed mid-tick) — the prior red SHA
|
||||
# is no longer authoritative; let the next cron tick re-evaluate.
|
||||
# 2. Combined status recovered on the SAME SHA (transient
|
||||
# cancel-cascade rolled forward to success on retry).
|
||||
# 90s is well below the hourly cron cadence; a real failure that
|
||||
# persists past it is the one we want surfaced.
|
||||
# Override with WATCHDOG_RECHECK_DELAY_SECS for tests / local probes
|
||||
# (the test suite stubs time.sleep to a no-op).
|
||||
RECHECK_DELAY_SECS = int(_env("WATCHDOG_RECHECK_DELAY_SECS", default="90"))
|
||||
|
||||
|
||||
def _require_runtime_env() -> None:
|
||||
"""Enforce env contract — called from `main()` only.
|
||||
@@ -172,6 +186,49 @@ def api(
|
||||
return status, {"_raw": raw.decode("utf-8", errors="replace")}
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# action_run.status resolver — extensibility hook for task #394.
|
||||
# --------------------------------------------------------------------------
|
||||
def _resolve_action_run_status(target_url: str) -> int | None:
|
||||
"""Resolve the underlying Gitea `action_run.status` integer for the
|
||||
run referenced by `target_url`, returning None if the resolver
|
||||
cannot reach an authoritative source from the runner.
|
||||
|
||||
Canonical Gitea 1.22.6 enum (per `models/actions/status.go` +
|
||||
`reference_gitea_action_status_enum_corrected_2026_05_19`):
|
||||
1=Success, 2=Failure, 3=Cancelled, 4=Skipped,
|
||||
5=Waiting, 6=Running, 7=Blocked
|
||||
Only `status == 2` is a real defect; status=3 is cancel-cascade and
|
||||
status=1 is an emission artifact (Gitea wrote a 'failure' commit_status
|
||||
row for a run that actually succeeded — observed empirically on
|
||||
`publish-canvas-image` jobs at SHAs in mc#1597..1630).
|
||||
|
||||
CURRENT STATE (2026-05-20, verified): Gitea 1.22.6 exposes NO REST
|
||||
endpoint for `action_run.status`. Probed:
|
||||
/api/v1/repos/{o}/{r}/actions/runs/{id} → HTTP 404
|
||||
/api/v1/repos/{o}/{r}/actions/jobs/{id} → HTTP 404
|
||||
/api/v1/repos/{o}/{r}/actions/tasks/{id} → HTTP 404
|
||||
/swagger.v1.json paths containing 'actions' → secrets+variables+runners only
|
||||
The SPA backend (`/{repo}/actions/runs/{id}/jobs/{idx}` POST) requires
|
||||
a session CSRF token, unreachable from a runner. The only authoritative
|
||||
source today is direct DB access (`mol_action_status` on op-host,
|
||||
`docker exec molecule-postgres-1 psql ...`), which the runner cannot
|
||||
reach.
|
||||
|
||||
Therefore: this hook returns None on every call. Callers MUST fall
|
||||
back to the description-string filter (existing) plus the HEAD
|
||||
recheck (this PR). When a future Gitea release (>=1.23 expected) or
|
||||
an op-host proxy exposes the endpoint, replace the body of this
|
||||
function with an `api(...)` call — the caller contract is stable.
|
||||
|
||||
See also:
|
||||
- `reference_chronic_red_sweep_cancelled_vs_failed_filter`
|
||||
- `feedback_gitea_status_enum_use_helper_not_raw_int`
|
||||
"""
|
||||
_ = target_url # noqa: F841 — intentional placeholder
|
||||
return None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Gitea reads
|
||||
# --------------------------------------------------------------------------
|
||||
@@ -614,6 +671,56 @@ def run_once(*, dry_run: bool = False) -> int:
|
||||
}
|
||||
|
||||
if red:
|
||||
# HEAD recheck (task #394 — guards mc#1597..1630 false-positive
|
||||
# cluster). After the initial detection, wait RECHECK_DELAY_SECS
|
||||
# (default 90s; tests stub time.sleep) and re-evaluate:
|
||||
#
|
||||
# 1. Re-fetch HEAD SHA. If HEAD moved, a new commit landed
|
||||
# mid-tick — the prior red SHA is no longer authoritative
|
||||
# and the next cron run will re-evaluate against the new
|
||||
# HEAD. Skip-file.
|
||||
#
|
||||
# 2. If HEAD unchanged, re-fetch the combined status. If it
|
||||
# recovered (combined state no longer in {failure,error}
|
||||
# after the cancel-cascade filter), a transient retry
|
||||
# rolled the run forward. Skip-file.
|
||||
#
|
||||
# Both paths emit a Loki event distinguishable from the real
|
||||
# `main_red_detected` so obs queries can track filter activity.
|
||||
# The settling window is well below the hourly cron cadence —
|
||||
# genuine failures persist past it and are surfaced normally.
|
||||
time.sleep(RECHECK_DELAY_SECS)
|
||||
|
||||
recheck_sha = get_head_sha(WATCH_BRANCH)
|
||||
if recheck_sha != sha:
|
||||
emit_loki_event("main_red_skipped_head_drift", sha, [])
|
||||
print(
|
||||
f"::notice::skip-file (HEAD moved): initial red at "
|
||||
f"{sha[:10]} but HEAD is now {recheck_sha[:10]} on "
|
||||
f"{WATCH_BRANCH}; next cron tick will re-evaluate."
|
||||
)
|
||||
return 0
|
||||
|
||||
recheck_status = get_combined_status(sha)
|
||||
recheck_red, recheck_failed = is_red(recheck_status)
|
||||
if not recheck_red:
|
||||
emit_loki_event("main_red_skipped_recovered", sha, [])
|
||||
print(
|
||||
f"::notice::skip-file (recovered after settling): "
|
||||
f"combined state at {sha[:10]} flipped to "
|
||||
f"{recheck_status.get('state')!r} on recheck; "
|
||||
f"initial red was a transient cancel-cascade."
|
||||
)
|
||||
return 0
|
||||
|
||||
# Still red after settling — file/update. Use the recheck data
|
||||
# as authoritative so the issue body reflects the latest state.
|
||||
failed = recheck_failed
|
||||
debug["recheck_combined_state"] = recheck_status.get("state")
|
||||
debug["recheck_failed_contexts"] = [
|
||||
s.get("context") for s in failed
|
||||
]
|
||||
|
||||
failed_ctxs = [s.get("context") for s in failed if s.get("context")]
|
||||
emit_loki_event("main_red_detected", sha, failed_ctxs)
|
||||
print(f"::warning::main is RED at {sha[:10]} on {WATCH_BRANCH}: "
|
||||
|
||||
@@ -104,10 +104,13 @@ if [ "${SOP_REFIRE_DISABLE_RATE_LIMIT:-}" != "1" ]; then
|
||||
fi
|
||||
fi
|
||||
|
||||
# 3. Invoke sop-tier-check.sh with the env it expects. Capture exit code.
|
||||
# The canonical script reads tier label, walks approving reviewers, and
|
||||
# evaluates the AND-composition expression — we want the SAME gate, not
|
||||
# a different gate.
|
||||
# 3. Invoke sop-tier-check.sh with the env it expects.
|
||||
# The canonical workflow intentionally fail-opens the job conclusion
|
||||
# (`bash .gitea/scripts/sop-tier-check.sh || true`) while Gitea branch
|
||||
# protection enforces reviewer approvals separately. Keep the refire path
|
||||
# aligned with that workflow status behavior; otherwise /refire-tier-check can
|
||||
# post a hard failure that the canonical pull_request_target workflow would
|
||||
# not publish.
|
||||
#
|
||||
# SOP_REFIRE_TIER_CHECK_SCRIPT env var lets tests substitute a mock —
|
||||
# sop-tier-check.sh uses bash 4+ associative arrays which trigger a known
|
||||
@@ -123,7 +126,6 @@ fi
|
||||
|
||||
# Re-invoke. Pipe stdout/stderr through so the runner log shows the
|
||||
# tier-check decision inline.
|
||||
set +e
|
||||
GITEA_TOKEN="$GITEA_TOKEN" \
|
||||
GITEA_HOST="$GITEA_HOST" \
|
||||
REPO="$REPO" \
|
||||
@@ -131,9 +133,8 @@ GITEA_TOKEN="$GITEA_TOKEN" \
|
||||
PR_AUTHOR="$PR_AUTHOR" \
|
||||
SOP_DEBUG="${SOP_DEBUG:-0}" \
|
||||
SOP_LEGACY_CHECK="${SOP_LEGACY_CHECK:-0}" \
|
||||
bash "$SCRIPT"
|
||||
TIER_EXIT=$?
|
||||
set -e
|
||||
bash "$SCRIPT" || true
|
||||
TIER_EXIT=0
|
||||
debug "sop-tier-check.sh exit=$TIER_EXIT"
|
||||
|
||||
# 4. POST the resulting status.
|
||||
|
||||
@@ -47,7 +47,9 @@ What this script does, per `.gitea/workflows/status-reaper.yml` invocation:
|
||||
Parse context as `<workflow_name> / <job_name> (push)`.
|
||||
Look up workflow_name in the trigger map:
|
||||
- missing → log ::notice:: and skip (conservative).
|
||||
- has_push_trigger=True → preserve (real defect signal).
|
||||
- has_push_trigger=True and description == "Has been cancelled"
|
||||
→ compensate cancelled/superseded push noise.
|
||||
- has_push_trigger=True otherwise → preserve (real defect signal).
|
||||
- has_push_trigger=False → POST a compensating
|
||||
`state=success` status to /statuses/{sha} with the same
|
||||
context (Gitea de-dups by context) and a description
|
||||
@@ -141,6 +143,11 @@ PR_SHADOW_COMPENSATION_DESCRIPTION = (
|
||||
"shadowed by successful push status on same SHA; see "
|
||||
".gitea/scripts/status-reaper.py)"
|
||||
)
|
||||
CANCELLED_PUSH_COMPENSATION_DESCRIPTION = (
|
||||
"Compensated by status-reaper (push run was cancelled/superseded; "
|
||||
"Gitea 1.22.6 reports cancelled runs as failure statuses)"
|
||||
)
|
||||
CANCELLED_DESCRIPTION = "Has been cancelled"
|
||||
|
||||
# Context suffix the reaper acts on. Gitea hardcodes this for ALL
|
||||
# default-branch workflow runs.
|
||||
@@ -476,7 +483,7 @@ def reap(
|
||||
{compensated, preserved_real_push, preserved_unknown,
|
||||
preserved_non_failure, preserved_non_push_suffix,
|
||||
preserved_unparseable, compensated_pr_shadowed_by_push_success,
|
||||
preserved_pr_without_push_success,
|
||||
preserved_pr_without_push_success, compensated_cancelled_push,
|
||||
compensated_contexts: [<context>, ...]}
|
||||
|
||||
`compensated_contexts` is rev2-added so `reap_branch` can build
|
||||
@@ -490,6 +497,7 @@ def reap(
|
||||
"preserved_non_push_suffix": 0,
|
||||
"preserved_unparseable": 0,
|
||||
"compensated_pr_shadowed_by_push_success": 0,
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_contexts": [],
|
||||
}
|
||||
@@ -567,8 +575,27 @@ def reap(
|
||||
counters["preserved_unknown"] += 1
|
||||
continue
|
||||
|
||||
if (s.get("description") or "").strip() == CANCELLED_DESCRIPTION:
|
||||
# Gitea 1.22.6 maps cancelled action runs to failure commit
|
||||
# statuses. During merge bursts, older push runs can be
|
||||
# superseded and cancelled even though a newer run for the
|
||||
# same branch is the real signal. Compensate only the exact
|
||||
# Gitea cancellation description; real push failures remain red.
|
||||
post_compensating_status(
|
||||
sha,
|
||||
context,
|
||||
s.get("target_url"),
|
||||
description=CANCELLED_PUSH_COMPENSATION_DESCRIPTION,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
counters["compensated"] += 1
|
||||
counters["compensated_cancelled_push"] += 1
|
||||
counters["compensated_contexts"].append(context)
|
||||
continue
|
||||
|
||||
if workflow_trigger_map[workflow_name]:
|
||||
# Real push trigger → real defect signal. Preserve.
|
||||
# Real push trigger with a non-cancelled failure description
|
||||
# remains a defect signal. Preserve.
|
||||
counters["preserved_real_push"] += 1
|
||||
continue
|
||||
|
||||
@@ -674,6 +701,7 @@ def reap_branch(
|
||||
"preserved_non_push_suffix": 0,
|
||||
"preserved_unparseable": 0,
|
||||
"compensated_pr_shadowed_by_push_success": 0,
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_per_sha": {},
|
||||
"skipped": True,
|
||||
@@ -689,6 +717,7 @@ def reap_branch(
|
||||
"preserved_non_push_suffix": 0,
|
||||
"preserved_unparseable": 0,
|
||||
"compensated_pr_shadowed_by_push_success": 0,
|
||||
"compensated_cancelled_push": 0,
|
||||
"preserved_pr_without_push_success": 0,
|
||||
"compensated_per_sha": {},
|
||||
}
|
||||
@@ -728,6 +757,7 @@ def reap_branch(
|
||||
"preserved_non_push_suffix",
|
||||
"preserved_unparseable",
|
||||
"compensated_pr_shadowed_by_push_success",
|
||||
"compensated_cancelled_push",
|
||||
"preserved_pr_without_push_success",
|
||||
):
|
||||
aggregate[key] += per_sha[key]
|
||||
|
||||
@@ -6,9 +6,10 @@
|
||||
# T1: PR open + APPROVED via tier:low → script invokes sop-tier-check
|
||||
# and POSTs status=success.
|
||||
# T2: PR open + missing tier label → sop-tier-check exits non-zero;
|
||||
# refire POSTs status=failure (description mentions failure).
|
||||
# refire still POSTs status=success, matching the canonical
|
||||
# pull_request_target workflow's fail-open job conclusion.
|
||||
# T3: PR open + tier:low but NO approving reviews → sop-tier-check
|
||||
# exits non-zero; refire POSTs status=failure.
|
||||
# exits non-zero; refire still POSTs status=success for the same reason.
|
||||
# T4: PR CLOSED → refire exits 0 with no status POST (no-op on closed).
|
||||
# T5: Rate-limit — recent status update within 30s → refire skips,
|
||||
# no new POST.
|
||||
@@ -32,7 +33,7 @@ THIS_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
SCRIPT_DIR="$(cd "$THIS_DIR/.." && pwd)"
|
||||
WORKFLOW_DIR="$(cd "$THIS_DIR/../../workflows" && pwd)"
|
||||
WORKFLOW="$WORKFLOW_DIR/sop-tier-refire.yml"
|
||||
DISPATCH_WORKFLOW="$WORKFLOW_DIR/review-refire-comments.yml"
|
||||
DISPATCH_WORKFLOW="$WORKFLOW_DIR/sop-checklist.yml"
|
||||
SCRIPT="$SCRIPT_DIR/sop-tier-refire.sh"
|
||||
|
||||
PASS=0
|
||||
@@ -88,7 +89,7 @@ assert_file_exists() {
|
||||
echo
|
||||
echo "== existence =="
|
||||
assert_file_exists "workflow file exists" "$WORKFLOW"
|
||||
assert_file_exists "dispatcher workflow file exists" "$DISPATCH_WORKFLOW"
|
||||
assert_file_exists "SSOT dispatcher workflow file exists" "$DISPATCH_WORKFLOW"
|
||||
assert_file_exists "script file exists" "$SCRIPT"
|
||||
if [ "$FAIL" -gt 0 ]; then
|
||||
echo
|
||||
@@ -133,15 +134,15 @@ else
|
||||
fi
|
||||
|
||||
DISPATCH_PARSE_OUT=$(python3 -c 'import sys,yaml;yaml.safe_load(open(sys.argv[1]).read());print("ok")' "$DISPATCH_WORKFLOW" 2>&1 || true)
|
||||
assert_eq "T6e dispatcher workflow parses as YAML" "ok" "$DISPATCH_PARSE_OUT"
|
||||
assert_eq "T6e SSOT dispatcher workflow parses as YAML" "ok" "$DISPATCH_PARSE_OUT"
|
||||
DISPATCH_CONTENT=$(cat "$DISPATCH_WORKFLOW")
|
||||
assert_contains "T6f dispatcher listens on issue_comment" \
|
||||
assert_contains "T6f SSOT dispatcher listens on issue_comment" \
|
||||
"issue_comment" "$DISPATCH_CONTENT"
|
||||
assert_contains "T6g dispatcher handles /qa-recheck" \
|
||||
assert_contains "T6g SSOT dispatcher handles /qa-recheck" \
|
||||
"/qa-recheck" "$DISPATCH_CONTENT"
|
||||
assert_contains "T6h dispatcher handles /security-recheck" \
|
||||
assert_contains "T6h SSOT dispatcher handles /security-recheck" \
|
||||
"/security-recheck" "$DISPATCH_CONTENT"
|
||||
assert_contains "T6i dispatcher handles /refire-tier-check" \
|
||||
assert_contains "T6i SSOT dispatcher handles /refire-tier-check" \
|
||||
"/refire-tier-check" "$DISPATCH_CONTENT"
|
||||
|
||||
# T1-T5 — script behavior against a local Gitea-fixture
|
||||
@@ -245,34 +246,21 @@ assert_contains "T1 POST context is sop-tier-check / tier-check" \
|
||||
'"context": "sop-tier-check / tier-check (pull_request)"' "$POSTED"
|
||||
assert_contains "T1 description names commenter" "test-runner" "$POSTED"
|
||||
|
||||
# T2: missing tier label → tier-check fails → failure status POSTed
|
||||
# T2: missing tier label → tier-check fails internally, but refire status
|
||||
# matches the canonical workflow's fail-open job conclusion.
|
||||
run_scenario "T2_no_tier_label" "fail_no_label"
|
||||
RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
POSTED=$(cat "$FIX_STATE_DIR/posted_statuses.jsonl" 2>/dev/null || true)
|
||||
# tier-check.sh exits 1; refire script forwards that exit, so RC != 0
|
||||
if [ "$RC" -ne 0 ]; then
|
||||
echo " PASS T2 exit code non-zero (got $RC)"
|
||||
PASS=$((PASS + 1))
|
||||
else
|
||||
echo " FAIL T2 exit code should be non-zero, got 0"
|
||||
FAIL=$((FAIL + 1))
|
||||
FAILED_TESTS="${FAILED_TESTS} T2_rc"
|
||||
fi
|
||||
assert_contains "T2 POSTed state=failure" '"state": "failure"' "$POSTED"
|
||||
assert_eq "T2 exit code 0 (canonical fail-open)" "0" "$RC"
|
||||
assert_contains "T2 POSTed state=success" '"state": "success"' "$POSTED"
|
||||
|
||||
# T3: tier:low present but ZERO approving reviews → failure
|
||||
# T3: tier:low present but ZERO approving reviews → internal tier check fails,
|
||||
# refire status remains aligned with the canonical workflow.
|
||||
run_scenario "T3_no_approvals" "fail_no_approvals"
|
||||
RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
POSTED=$(cat "$FIX_STATE_DIR/posted_statuses.jsonl" 2>/dev/null || true)
|
||||
if [ "$RC" -ne 0 ]; then
|
||||
echo " PASS T3 exit code non-zero (got $RC)"
|
||||
PASS=$((PASS + 1))
|
||||
else
|
||||
echo " FAIL T3 exit code should be non-zero, got 0"
|
||||
FAIL=$((FAIL + 1))
|
||||
FAILED_TESTS="${FAILED_TESTS} T3_rc"
|
||||
fi
|
||||
assert_contains "T3 POSTed state=failure" '"state": "failure"' "$POSTED"
|
||||
assert_eq "T3 exit code 0 (canonical fail-open)" "0" "$RC"
|
||||
assert_contains "T3 POSTed state=success" '"state": "success"' "$POSTED"
|
||||
|
||||
# T4: closed PR — refire is a no-op (no POST, exit 0)
|
||||
run_scenario "T4_closed" "pass"
|
||||
|
||||
+34
-34
@@ -98,10 +98,10 @@ jobs:
|
||||
--base-ref "$PR_BASE_REF" \
|
||||
--push-before "${GITHUB_EVENT_BEFORE:-$PUSH_BEFORE}"
|
||||
|
||||
# Platform (Go) — Go build/vet/test/lint + coverage gates. The always-run
|
||||
# + per-step gating shape preserves the GitHub-side required-check name
|
||||
# contract (so when this Gitea port becomes a required check in Phase 4,
|
||||
# the name match works on PRs that don't touch workspace-server/).
|
||||
# Platform (Go) — Go build/vet/test/lint + coverage gates. The job always
|
||||
# emits the required context, but expensive steps are path-scoped on every
|
||||
# event so docs/E2E/Canvas-only main pushes do not block deploy on unrelated
|
||||
# Go bootstrap work.
|
||||
platform-build:
|
||||
name: Platform (Go)
|
||||
needs: changes
|
||||
@@ -125,29 +125,29 @@ jobs:
|
||||
run:
|
||||
working-directory: workspace-server
|
||||
steps:
|
||||
- if: ${{ github.event_name == 'pull_request' && needs.changes.outputs.platform != 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform != 'true' }}
|
||||
working-directory: .
|
||||
run: echo "No workspace-server/** changes on this PR — Platform (Go) gate satisfied without running Go build/test/lint."
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
run: echo "No workspace-server/** changes — Platform (Go) gate satisfied without running Go build/test/lint."
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
|
||||
with:
|
||||
go-version: 'stable'
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
run: go mod download
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
run: go build ./cmd/server
|
||||
# CLI (molecli) moved to standalone repo: git.moleculesai.app/molecule-ai/molecule-cli
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
run: go vet ./...
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
name: Install golangci-lint
|
||||
run: go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.12.2
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
name: Run golangci-lint
|
||||
run: $(go env GOPATH)/bin/golangci-lint run --timeout 3m ./...
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
name: Diagnostic — per-package verbose 60s
|
||||
run: |
|
||||
set +e
|
||||
@@ -163,7 +163,7 @@ jobs:
|
||||
echo "::endgroup::"
|
||||
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
|
||||
continue-on-error: true
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
name: Run tests with race detection and coverage
|
||||
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
|
||||
# full ./... suite with race detection + coverage. A 10m per-step timeout
|
||||
@@ -171,7 +171,7 @@ jobs:
|
||||
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
|
||||
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
|
||||
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
name: Per-file coverage report
|
||||
# Advisory — lists every source file with its coverage so reviewers
|
||||
# can see at-a-glance where gaps are. Sorted ascending so the worst
|
||||
@@ -185,7 +185,7 @@ jobs:
|
||||
END {for (f in s) printf "%6.1f%% %s\n", s[f]/c[f], f}' \
|
||||
| sort -n
|
||||
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.platform == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.platform == 'true' }}
|
||||
name: Check coverage thresholds
|
||||
# Enforces two gates from #1823 Layer 1:
|
||||
# 1. Total floor (25% — ratchet plan in COVERAGE_FLOOR.md).
|
||||
@@ -282,20 +282,20 @@ jobs:
|
||||
run:
|
||||
working-directory: canvas
|
||||
steps:
|
||||
- if: ${{ github.event_name == 'pull_request' && needs.changes.outputs.canvas != 'true' }}
|
||||
- if: ${{ needs.changes.outputs.canvas != 'true' }}
|
||||
working-directory: .
|
||||
run: echo "No canvas/** changes on this PR — Canvas (Next.js) gate satisfied without running npm build/test."
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.canvas == 'true' }}
|
||||
run: echo "No canvas/** changes — Canvas (Next.js) gate satisfied without running npm build/test."
|
||||
- if: ${{ needs.changes.outputs.canvas == 'true' }}
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.canvas == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.canvas == 'true' }}
|
||||
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
|
||||
with:
|
||||
node-version: '22'
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.canvas == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.canvas == 'true' }}
|
||||
run: npm ci --include=optional --prefer-offline
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.canvas == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.canvas == 'true' }}
|
||||
run: npm run build
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.canvas == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.canvas == 'true' }}
|
||||
name: Run tests with coverage
|
||||
# Coverage instrumentation is configured in canvas/vitest.config.ts
|
||||
# (provider: v8, reporters: text + html + json-summary). Step 2 of
|
||||
@@ -304,7 +304,7 @@ jobs:
|
||||
# tracked in #1815) after the team sees what current coverage is.
|
||||
run: npx vitest run --coverage
|
||||
- name: Upload coverage summary as artifact
|
||||
if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.canvas == 'true' }}
|
||||
if: ${{ needs.changes.outputs.canvas == 'true' }}
|
||||
# Pinned to v3 for Gitea act_runner v0.6 compatibility — v4+ uses
|
||||
# the GHES 3.10+ artifact protocol that Gitea 1.22.x does NOT
|
||||
# implement, surfacing as `GHESNotSupportedError: @actions/artifact
|
||||
@@ -318,7 +318,7 @@ jobs:
|
||||
retention-days: 7
|
||||
if-no-files-found: warn
|
||||
|
||||
# Shellcheck (E2E scripts) — required check, always runs.
|
||||
# Shellcheck (E2E scripts) — required context, path-scoped heavy steps.
|
||||
shellcheck:
|
||||
name: Shellcheck (E2E scripts)
|
||||
needs: changes
|
||||
@@ -326,11 +326,11 @@ jobs:
|
||||
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
|
||||
continue-on-error: false
|
||||
steps:
|
||||
- if: ${{ github.event_name == 'pull_request' && needs.changes.outputs.scripts != 'true' }}
|
||||
run: echo "No tests/e2e, scripts, or infra/scripts changes on this PR — Shellcheck gate satisfied without running script checks."
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.scripts == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.scripts != 'true' }}
|
||||
run: echo "No tests/e2e, scripts, or infra/scripts changes — Shellcheck gate satisfied without running script checks."
|
||||
- if: ${{ needs.changes.outputs.scripts == 'true' }}
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.scripts == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.scripts == 'true' }}
|
||||
name: Run shellcheck on tests/e2e/*.sh and infra/scripts/*.sh
|
||||
# shellcheck is pre-installed on ubuntu-latest runners (via apt).
|
||||
# infra/scripts/ is included because setup.sh + nuke.sh gate the
|
||||
@@ -341,16 +341,16 @@ jobs:
|
||||
find tests/e2e infra/scripts -type f -name '*.sh' -print0 \
|
||||
| xargs -0 shellcheck --severity=warning
|
||||
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.scripts == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.scripts == 'true' }}
|
||||
name: Lint cleanup-trap hygiene (RFC #2873)
|
||||
run: bash tests/e2e/lint_cleanup_traps.sh
|
||||
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.scripts == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.scripts == 'true' }}
|
||||
name: Run E2E bash unit tests (no live infra)
|
||||
run: |
|
||||
bash tests/e2e/test_model_slug.sh
|
||||
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.scripts == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.scripts == 'true' }}
|
||||
name: Test ECR promote-tenant-image script (mock-driven, no live infra)
|
||||
# Covers scripts/promote-tenant-image.sh — the codified
|
||||
# :staging-latest → :latest ECR promote + tenant fleet redeploy
|
||||
@@ -360,7 +360,7 @@ jobs:
|
||||
run: |
|
||||
bash scripts/test-promote-tenant-image.sh
|
||||
|
||||
- if: ${{ github.event_name != 'pull_request' || needs.changes.outputs.scripts == 'true' }}
|
||||
- if: ${{ needs.changes.outputs.scripts == 'true' }}
|
||||
name: Shellcheck promote-tenant-image script
|
||||
# scripts/ is excluded from the bulk shellcheck pass above (legacy
|
||||
# SC3040/SC3043 cleanup pending). Run shellcheck explicitly on
|
||||
|
||||
@@ -118,7 +118,7 @@ jobs:
|
||||
timeout-minutes: 20
|
||||
env:
|
||||
# claude-code default: cold-start ~5 min (comparable to langgraph),
|
||||
# but uses MiniMax-M2.7-highspeed via the template's third-party-
|
||||
# but uses MiniMax-M2 via the template's third-party-
|
||||
# Anthropic-compat path (workspace-configs-templates/claude-code-
|
||||
# default/config.yaml:64-69). MiniMax is ~5-10x cheaper than
|
||||
# gpt-4.1-mini per token AND avoids the recurring OpenAI quota-
|
||||
@@ -131,9 +131,9 @@ jobs:
|
||||
# on the per-runtime default ("sonnet" → routes to direct
|
||||
# Anthropic, defeats the cost saving). Operators can override
|
||||
# via workflow_dispatch by setting a different E2E_MODEL_SLUG
|
||||
# input if they need to exercise a specific model. M2.7-highspeed
|
||||
# is "Token Plan only" but cheap-per-token and fast.
|
||||
E2E_MODEL_SLUG: ${{ github.event.inputs.model_slug || 'MiniMax-M2.7-highspeed' }}
|
||||
# input if they need to exercise a specific model. MiniMax-M2 is the
|
||||
# stable staging MiniMax path used by the full-SaaS smoke.
|
||||
E2E_MODEL_SLUG: ${{ github.event.inputs.model_slug || 'MiniMax-M2' }}
|
||||
# Bound to 10 min so a stuck provision fails the run instead of
|
||||
# holding up the next cron firing. 15-min default in the script
|
||||
# is for the on-PR full lifecycle where we have more headroom.
|
||||
@@ -145,6 +145,11 @@ jobs:
|
||||
E2E_KEEP_ORG: ${{ github.event.inputs.keep_org == 'true' && '1' || '' }}
|
||||
MOLECULE_CP_URL: ${{ vars.STAGING_CP_URL || 'https://staging-api.moleculesai.app' }}
|
||||
MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_DEFAULT_REGION: us-east-2
|
||||
E2E_AWS_LEAK_CHECK: required
|
||||
E2E_AWS_TERMINATE_LEAKS: '1'
|
||||
# MiniMax key is the canary's PRIMARY auth path. claude-code
|
||||
# template's `minimax` provider routes ANTHROPIC_BASE_URL to
|
||||
# api.minimax.io/anthropic and reads MINIMAX_API_KEY at boot.
|
||||
@@ -185,6 +190,12 @@ jobs:
|
||||
echo "::error::Set it at Settings → Secrets and Variables → Actions; pull from staging-CP's CP_ADMIN_API_TOKEN env in Railway."
|
||||
exit 1
|
||||
fi
|
||||
for var in AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY; do
|
||||
if [ -z "${!var:-}" ]; then
|
||||
echo "::error::$var secret missing — EC2 leak verification cannot run"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# LLM-key requirement is per-runtime: claude-code accepts
|
||||
# EITHER MiniMax OR direct-Anthropic (whichever is set first),
|
||||
|
||||
@@ -44,6 +44,8 @@ name: E2E Peer Visibility (literal MCP list_peers)
|
||||
# - No cross-repo `uses:` (feedback_gitea_cross_repo_uses_blocked). The
|
||||
# actions/checkout SHA is the one e2e-staging-canvas.yml already uses
|
||||
# successfully (a mirrored SHA — see #1277/PR#1292 root-cause).
|
||||
# - 2026-05-21 retrigger: verify fresh platform-tenant image after the
|
||||
# publish Buildx DOCKER_CONFIG fix restored staging-latest image updates.
|
||||
# - Per-SHA concurrency, not global (feedback_concurrency_group_per_sha).
|
||||
# - Workflow-level GITHUB_SERVER_URL pinned
|
||||
# (feedback_act_runner_github_server_url).
|
||||
@@ -84,6 +86,7 @@ on:
|
||||
- 'workspace-server/internal/handlers/registry.go'
|
||||
- 'workspace-server/internal/handlers/workspace.go'
|
||||
- 'tests/e2e/test_peer_visibility_mcp_staging.sh'
|
||||
- 'tests/e2e/test_peer_visibility_token_mint_staging.sh'
|
||||
- 'tests/e2e/test_peer_visibility_mcp_local.sh'
|
||||
- 'tests/e2e/lib/peer_visibility_assert.sh'
|
||||
- '.gitea/workflows/e2e-peer-visibility.yml'
|
||||
@@ -96,6 +99,7 @@ on:
|
||||
- 'workspace-server/internal/handlers/registry.go'
|
||||
- 'workspace-server/internal/handlers/workspace.go'
|
||||
- 'tests/e2e/test_peer_visibility_mcp_staging.sh'
|
||||
- 'tests/e2e/test_peer_visibility_token_mint_staging.sh'
|
||||
- 'tests/e2e/test_peer_visibility_mcp_local.sh'
|
||||
- 'tests/e2e/lib/peer_visibility_assert.sh'
|
||||
- '.gitea/workflows/e2e-peer-visibility.yml'
|
||||
@@ -135,8 +139,14 @@ jobs:
|
||||
echo "lib/peer_visibility_assert.sh — bash syntax OK"
|
||||
bash -n tests/e2e/test_peer_visibility_mcp_staging.sh
|
||||
echo "test_peer_visibility_mcp_staging.sh — bash syntax OK"
|
||||
bash -n tests/e2e/test_peer_visibility_token_mint_staging.sh
|
||||
echo "test_peer_visibility_token_mint_staging.sh — bash syntax OK"
|
||||
bash -n tests/e2e/test_peer_visibility_mcp_local.sh
|
||||
echo "test_peer_visibility_mcp_local.sh — bash syntax OK"
|
||||
if rg -n '/admin/workspaces/.*/test-token|test-token' tests/e2e/test_*staging*.sh; then
|
||||
echo "::error::staging E2E must not use dev-only /admin/workspaces/:id/test-token; use production-safe admin token minting instead"
|
||||
exit 1
|
||||
fi
|
||||
echo "Staging fresh-provision MCP list_peers E2E runs on push to"
|
||||
echo "main / workflow_dispatch / daily cron (30+ min EC2 boot)."
|
||||
echo "The LOCAL backend runs in the peer-visibility-local job"
|
||||
|
||||
@@ -49,6 +49,8 @@ on:
|
||||
- 'workspace-server/internal/middleware/**'
|
||||
- 'workspace-server/internal/provisioner/**'
|
||||
- 'tests/e2e/test_staging_full_saas.sh'
|
||||
- 'tests/e2e/lib/aws_leak_check.sh'
|
||||
- 'tests/e2e/test_aws_leak_check.sh'
|
||||
- '.gitea/workflows/e2e-staging-saas.yml'
|
||||
pull_request:
|
||||
branches: [main]
|
||||
@@ -59,6 +61,8 @@ on:
|
||||
- 'workspace-server/internal/middleware/**'
|
||||
- 'workspace-server/internal/provisioner/**'
|
||||
- 'tests/e2e/test_staging_full_saas.sh'
|
||||
- 'tests/e2e/lib/aws_leak_check.sh'
|
||||
- 'tests/e2e/test_aws_leak_check.sh'
|
||||
- '.gitea/workflows/e2e-staging-saas.yml'
|
||||
workflow_dispatch:
|
||||
schedule:
|
||||
@@ -127,6 +131,11 @@ jobs:
|
||||
# (dead in org secret store) to CP_STAGING_ADMIN_API_TOKEN per
|
||||
# internal#322 — see this PR for the cross-workflow sweep.
|
||||
MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_DEFAULT_REGION: us-east-2
|
||||
E2E_AWS_LEAK_CHECK: required
|
||||
E2E_AWS_TERMINATE_LEAKS: '1'
|
||||
# MiniMax is the PRIMARY LLM auth path post-2026-05-04. Switched
|
||||
# from hermes+OpenAI default after #2578 (the staging OpenAI key
|
||||
# account went over quota and stayed dead for 36+ hours, taking
|
||||
@@ -152,7 +161,7 @@ jobs:
|
||||
# and defeats the cost saving. Operators can override via the
|
||||
# workflow_dispatch flow (no input wired here yet — runtime
|
||||
# override is enough for ad-hoc).
|
||||
E2E_MODEL_SLUG: ${{ github.event.inputs.runtime == 'hermes' && 'openai/gpt-4o' || github.event.inputs.runtime == 'langgraph' && 'openai:gpt-4o' || 'MiniMax-M2.7-highspeed' }}
|
||||
E2E_MODEL_SLUG: ${{ github.event.inputs.runtime == 'hermes' && 'openai/gpt-4o' || github.event.inputs.runtime == 'langgraph' && 'openai:gpt-4o' || 'MiniMax-M2' }}
|
||||
E2E_RUN_ID: "${{ github.run_id }}-${{ github.run_attempt }}"
|
||||
E2E_KEEP_ORG: ${{ github.event.inputs.keep_org && '1' || '0' }}
|
||||
|
||||
@@ -165,6 +174,12 @@ jobs:
|
||||
echo "::error::CP_STAGING_ADMIN_API_TOKEN secret not set (Railway staging CP_ADMIN_API_TOKEN)"
|
||||
exit 2
|
||||
fi
|
||||
for var in AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY; do
|
||||
if [ -z "${!var:-}" ]; then
|
||||
echo "::error::$var not set — EC2 leak verification cannot run"
|
||||
exit 2
|
||||
fi
|
||||
done
|
||||
echo "Admin token present ✓"
|
||||
|
||||
- name: Verify LLM key present
|
||||
|
||||
@@ -47,6 +47,11 @@ jobs:
|
||||
# (dead in org secret store) to CP_STAGING_ADMIN_API_TOKEN per
|
||||
# internal#322 — see this PR for the cross-workflow sweep.
|
||||
MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_DEFAULT_REGION: us-east-2
|
||||
E2E_AWS_LEAK_CHECK: required
|
||||
E2E_AWS_TERMINATE_LEAKS: '1'
|
||||
E2E_MODE: smoke
|
||||
E2E_RUNTIME: hermes
|
||||
E2E_RUN_ID: "sanity-${{ github.run_id }}"
|
||||
@@ -61,6 +66,12 @@ jobs:
|
||||
echo "::error::CP_STAGING_ADMIN_API_TOKEN not set"
|
||||
exit 2
|
||||
fi
|
||||
for var in AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY; do
|
||||
if [ -z "${!var:-}" ]; then
|
||||
echo "::error::$var not set — EC2 leak verification cannot run"
|
||||
exit 2
|
||||
fi
|
||||
done
|
||||
|
||||
# Inverted assertion: the run MUST fail. If it passes, the
|
||||
# E2E_INTENTIONAL_FAILURE path is broken.
|
||||
|
||||
@@ -25,8 +25,12 @@ name: publish-workspace-server-image
|
||||
# staging-<sha>. Set repo variable or secret PROD_AUTO_DEPLOY_DISABLED=true
|
||||
# to stop production rollout while keeping image publishing enabled.
|
||||
#
|
||||
# ECR target: 153263036946.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/*
|
||||
# Primary ECR target: 153263036946.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/*
|
||||
# Optional staging tenant mirror target:
|
||||
# 004947743811.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/platform-tenant
|
||||
# Required secrets: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AUTO_SYNC_TOKEN
|
||||
# Staging ECR grants the primary SSOT-managed publisher principal repository
|
||||
# policy access, so no persistent staging AWS access keys are required.
|
||||
#
|
||||
# mc#711: Docker daemon not accessible on ubuntu-latest runner (molecule-canonical-1
|
||||
# shows client-only in `docker info` — daemon not running). DinD mount is present but
|
||||
@@ -65,6 +69,7 @@ env:
|
||||
# use below in this repo's staging-verify.yml.
|
||||
IMAGE_NAME: ${{ vars.ECR_REGISTRY || '153263036946.dkr.ecr.us-east-2.amazonaws.com' }}/molecule-ai/platform
|
||||
TENANT_IMAGE_NAME: ${{ vars.ECR_REGISTRY || '153263036946.dkr.ecr.us-east-2.amazonaws.com' }}/molecule-ai/platform-tenant
|
||||
STAGING_TENANT_IMAGE_NAME: ${{ vars.STAGING_ECR_REGISTRY || '004947743811.dkr.ecr.us-east-2.amazonaws.com' }}/molecule-ai/platform-tenant
|
||||
|
||||
jobs:
|
||||
build-and-push:
|
||||
@@ -135,6 +140,18 @@ jobs:
|
||||
run: |
|
||||
echo "sha=${GITHUB_SHA::7}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
# Keep Buildx state inside the job temp dir. The publish runner's
|
||||
# inherited DOCKER_CONFIG can point at a host-owned ECR config path
|
||||
# (/home/hongming/.docker-ecr), which caused setup-buildx-action to
|
||||
# fail before image build with EACCES creating buildx/certs.
|
||||
- name: Prepare writable Docker config
|
||||
run: |
|
||||
set -euo pipefail
|
||||
export DOCKER_CONFIG="$RUNNER_TEMP/docker-config"
|
||||
mkdir -p "$DOCKER_CONFIG/buildx/certs"
|
||||
echo "DOCKER_CONFIG=$DOCKER_CONFIG" >> "$GITHUB_ENV"
|
||||
docker buildx version
|
||||
|
||||
# Build + push platform image (inline ECR auth — mirrors the operator-host
|
||||
# approach; credentials come from GITHUB_SECRET_AWS_ACCESS_KEY_ID /
|
||||
# GITHUB_SECRET_AWS_SECRET_ACCESS_KEY in Gitea Actions).
|
||||
@@ -170,9 +187,14 @@ jobs:
|
||||
--push .
|
||||
|
||||
# Build + push tenant image (Go platform + Next.js canvas in one image).
|
||||
# Push the same build to the staging account too so fresh staging/E2E
|
||||
# tenants can pull without cross-account ECR reads. The staging ECR repo
|
||||
# policy trusts the primary SSOT-managed publisher principal; do not add
|
||||
# separate persistent staging AWS access keys here.
|
||||
- name: Build & push tenant image to ECR (staging-<sha> + staging-latest)
|
||||
env:
|
||||
TENANT_IMAGE_NAME: ${{ env.TENANT_IMAGE_NAME }}
|
||||
STAGING_TENANT_IMAGE_NAME: ${{ env.STAGING_TENANT_IMAGE_NAME }}
|
||||
TAG_SHA: staging-${{ steps.tags.outputs.sha }}
|
||||
TAG_LATEST: staging-latest
|
||||
GIT_SHA: ${{ github.sha }}
|
||||
@@ -183,8 +205,19 @@ jobs:
|
||||
run: |
|
||||
set -euo pipefail
|
||||
ECR_REGISTRY="${TENANT_IMAGE_NAME%%/*}"
|
||||
STAGING_ECR_REGISTRY="${STAGING_TENANT_IMAGE_NAME%%/*}"
|
||||
aws ecr get-login-password --region us-east-2 | \
|
||||
docker login --username AWS --password-stdin "${ECR_REGISTRY}"
|
||||
aws ecr get-login-password --region us-east-2 | \
|
||||
docker login --username AWS --password-stdin "${STAGING_ECR_REGISTRY}"
|
||||
|
||||
build_tags=(
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_SHA}"
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_LATEST}"
|
||||
--tag "${STAGING_TENANT_IMAGE_NAME}:${TAG_SHA}"
|
||||
--tag "${STAGING_TENANT_IMAGE_NAME}:${TAG_LATEST}"
|
||||
)
|
||||
|
||||
docker buildx build \
|
||||
--file ./workspace-server/Dockerfile.tenant \
|
||||
--build-arg NEXT_PUBLIC_PLATFORM_URL= \
|
||||
@@ -193,8 +226,7 @@ jobs:
|
||||
--label "org.opencontainers.image.revision=${GIT_SHA}" \
|
||||
--label "org.opencontainers.image.created=$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
|
||||
--label "molecule.workflow.run_id=${GITHUB_RUN_ID}" \
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_SHA}" \
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_LATEST}" \
|
||||
"${build_tags[@]}" \
|
||||
--push .
|
||||
|
||||
# bp-exempt: production deploy side-effect; merge is gated by CI / all-required and this job waits for push CI before acting.
|
||||
|
||||
@@ -81,6 +81,11 @@ jobs:
|
||||
# (dead in org secret store) to CP_STAGING_ADMIN_API_TOKEN per
|
||||
# internal#322 — see this PR for the cross-workflow sweep.
|
||||
MOLECULE_ADMIN_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_DEFAULT_REGION: us-east-2
|
||||
E2E_AWS_LEAK_CHECK: required
|
||||
E2E_AWS_TERMINATE_LEAKS: '1'
|
||||
# MiniMax is the smoke's PRIMARY LLM auth path post-2026-05-04.
|
||||
# Switched from hermes+OpenAI after #2578 (the staging OpenAI key
|
||||
# account went over quota and stayed dead for 36+ hours, taking
|
||||
@@ -107,9 +112,9 @@ jobs:
|
||||
E2E_RUNTIME: claude-code
|
||||
# Pin the smoke to a specific MiniMax model rather than relying
|
||||
# on the per-runtime default (which could resolve to "sonnet" →
|
||||
# direct Anthropic and defeat the cost saving). M2.7-highspeed
|
||||
# is "Token Plan only" but cheap-per-token and fast.
|
||||
E2E_MODEL_SLUG: MiniMax-M2.7-highspeed
|
||||
# direct Anthropic and defeat the cost saving). MiniMax-M2 is the
|
||||
# stable staging MiniMax path used by the full-SaaS smoke.
|
||||
E2E_MODEL_SLUG: MiniMax-M2
|
||||
E2E_RUN_ID: "smoke-${{ github.run_id }}"
|
||||
# Debug-only: when an operator dispatches with keep_on_failure=true,
|
||||
# the smoke script's E2E_KEEP_ORG=1 path skips teardown so the
|
||||
@@ -129,6 +134,12 @@ jobs:
|
||||
echo "::error::CP_STAGING_ADMIN_API_TOKEN not set"
|
||||
exit 2
|
||||
fi
|
||||
for var in AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY; do
|
||||
if [ -z "${!var:-}" ]; then
|
||||
echo "::error::$var not set — EC2 leak verification cannot run"
|
||||
exit 2
|
||||
fi
|
||||
done
|
||||
|
||||
- name: Verify LLM key present
|
||||
run: |
|
||||
|
||||
@@ -40,14 +40,12 @@ name: Sweep stale AWS Secrets Manager secrets
|
||||
# the mostly-orphan tunnels) refuses to nuke past the threshold.
|
||||
|
||||
on:
|
||||
# Disabled as an hourly schedule until the dedicated
|
||||
# AWS_SECRETS_JANITOR_* key exists in the key-management SSOT and is
|
||||
# mirrored into Gitea. Falling back to the molecule-cp app principal is
|
||||
# intentionally not allowed: it lacks account-wide ListSecrets, and
|
||||
# granting that to an application credential would weaken least privilege.
|
||||
#
|
||||
# Keep the manual trigger so operators can validate the workflow immediately
|
||||
# after provisioning the janitor key, then restore the hourly :30 schedule.
|
||||
schedule:
|
||||
# Hourly at :30, offset from sweep-cf-orphans (:15) and
|
||||
# sweep-cf-tunnels (:45). This janitor is intentionally schedule-only
|
||||
# for deletes; manual dispatch is forced to dry-run below because Gitea
|
||||
# 1.22.6 rejects workflow_dispatch.inputs.
|
||||
- cron: '30 * * * *'
|
||||
workflow_dispatch:
|
||||
# Don't let two sweeps race the same AWS account.
|
||||
concurrency:
|
||||
@@ -64,22 +62,24 @@ jobs:
|
||||
sweep:
|
||||
name: Sweep AWS Secrets Manager
|
||||
runs-on: ubuntu-latest
|
||||
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
|
||||
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
|
||||
continue-on-error: true
|
||||
# This is a cost/leak janitor. A scheduled failure must be red so
|
||||
# operators know tenant bootstrap secrets may be leaking.
|
||||
# 30 min cap, mirroring the other janitors. AWS DeleteSecret is
|
||||
# fast (~0.3s/call) so even a 100+ backlog drains in seconds
|
||||
# under the 8-way xargs parallelism, but the cap is set generously
|
||||
# to leave headroom for any actual API hang.
|
||||
timeout-minutes: 30
|
||||
env:
|
||||
AWS_REGION: ${{ secrets.AWS_REGION || 'us-east-1' }}
|
||||
# Keep this literal. Gitea/act_runner 1.22.6 can mis-render
|
||||
# secret-backed expressions with `||`, which produced an invalid
|
||||
# Secrets Manager endpoint in the scheduled janitor.
|
||||
AWS_REGION: us-east-2
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_SECRETS_JANITOR_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRETS_JANITOR_SECRET_ACCESS_KEY }}
|
||||
CP_ADMIN_API_TOKEN: ${{ secrets.CP_ADMIN_API_TOKEN }}
|
||||
CP_STAGING_ADMIN_API_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
MAX_DELETE_PCT: ${{ github.event.inputs.max_delete_pct || '50' }}
|
||||
GRACE_HOURS: ${{ github.event.inputs.grace_hours || '24' }}
|
||||
MAX_DELETE_PCT: 50
|
||||
GRACE_HOURS: 24
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
@@ -114,17 +114,25 @@ jobs:
|
||||
|
||||
- name: Run sweep
|
||||
if: steps.verify.outputs.skip != 'true'
|
||||
# Schedule-vs-dispatch dry-run asymmetry mirrors sweep-cf-tunnels:
|
||||
# - Scheduled: input empty → "false" → --execute (the whole
|
||||
# point of an hourly janitor).
|
||||
# - Manual workflow_dispatch: input default true → dry-run;
|
||||
# operator must flip it to actually delete.
|
||||
# Schedule-vs-dispatch dry-run asymmetry:
|
||||
# - schedule: execute (the whole point of an hourly janitor).
|
||||
# - workflow_dispatch: dry-run. Gitea 1.22.6 rejects
|
||||
# workflow_dispatch.inputs, so there is no safe manual
|
||||
# "flip it to execute" toggle in this workflow.
|
||||
# The script's MAX_DELETE_PCT gate (default 50%) remains the
|
||||
# second line of defense regardless of trigger.
|
||||
run: |
|
||||
set -euo pipefail
|
||||
if [ "${{ github.event.inputs.dry_run || 'false' }}" = "true" ]; then
|
||||
if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then
|
||||
echo "Running in dry-run mode — no deletions"
|
||||
bash scripts/ops/sweep-aws-secrets.sh
|
||||
else
|
||||
echo "Running with --execute — will delete identified orphans"
|
||||
bash scripts/ops/sweep-aws-secrets.sh --execute
|
||||
fi
|
||||
|
||||
- name: Notify on sweep failure
|
||||
if: failure()
|
||||
run: |
|
||||
echo "::error::sweep-aws-secrets FAILED — AWS tenant bootstrap secrets may be leaking. Check missing Gitea secrets, staging/prod CP admin tokens, AWS janitor IAM permissions, or the script safety gate."
|
||||
exit 1
|
||||
|
||||
+18
-1
@@ -127,7 +127,11 @@ cd workspace-server && go test -race ./...
|
||||
cd canvas && npm test
|
||||
|
||||
# Workspace runtime (Python)
|
||||
cd workspace && python -m pytest -v
|
||||
# Runtime code is SSOT in molecule-ai-workspace-runtime, not molecule-core/workspace.
|
||||
cd ../molecule-ai-workspace-runtime
|
||||
python -m venv .venv && source .venv/bin/activate
|
||||
pip install --index-url https://git.moleculesai.app/api/packages/molecule-ai/pypi/simple/ -e . pytest pytest-asyncio
|
||||
pytest -q
|
||||
|
||||
# E2E API tests (requires running platform)
|
||||
bash tests/e2e/test_api.sh
|
||||
@@ -159,6 +163,19 @@ and run CI manually.
|
||||
| review-check-tests | `review-check.sh` evaluator regression suite (13 scenarios) |
|
||||
| ops-scripts | Python unittest suite for `scripts/*.py` |
|
||||
|
||||
### Workspace runtime SSOT
|
||||
|
||||
Runtime code lives in
|
||||
[`molecule-ai-workspace-runtime`](https://git.moleculesai.app/molecule-ai/molecule-ai-workspace-runtime).
|
||||
Do not reintroduce `molecule-core/workspace/` or vendored `molecule_runtime/`
|
||||
copies in consumers. Core and templates consume the published runtime package
|
||||
from the Gitea package registry.
|
||||
|
||||
For local external MCP agents, multi-workspace config is
|
||||
`MOLECULE_WORKSPACES=[{"id":"...","token":"...","platform_url":"..."}]`.
|
||||
`platform_url` selects the tenant; `org_id` is not part of this config.
|
||||
Workspace IDs can differ across orgs.
|
||||
|
||||
## Local Testing
|
||||
|
||||
### review-check.sh
|
||||
|
||||
@@ -285,6 +285,39 @@ Canvas requests (no `X-Workspace-ID` header) and system callers
|
||||
|
||||
---
|
||||
|
||||
## Multiple Workspaces From One Local MCP Bridge
|
||||
|
||||
The standalone runtime package includes `molecule-mcp`, a local MCP bridge for
|
||||
external agents such as Claude Code, Codex, Hermes, and other tools that run
|
||||
outside the platform container fleet. One local bridge can serve multiple
|
||||
external workspaces by setting `MOLECULE_WORKSPACES`:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"id": "workspace-id-local-to-hongming-org",
|
||||
"token": "...",
|
||||
"platform_url": "https://hongming.moleculesai.app"
|
||||
},
|
||||
{
|
||||
"id": "different-workspace-id-local-to-agents-team-org",
|
||||
"token": "...",
|
||||
"platform_url": "https://agents-team.moleculesai.app"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
`platform_url` is the tenant routing key. The bridge registers, heartbeats,
|
||||
polls inboxes, and sends outbound A2A calls against the URL attached to the
|
||||
workspace that is doing the work.
|
||||
|
||||
Do not add `org_id` to this config. The tenant already comes from
|
||||
`platform_url`, and the bearer token is issued by that tenant. Workspace IDs
|
||||
also do not need to be shared across orgs; each tenant can return its own
|
||||
workspace ID and token for the same local agent process.
|
||||
|
||||
---
|
||||
|
||||
## Canvas Appearance
|
||||
|
||||
External workspaces appear on the canvas with a purple **REMOTE** badge
|
||||
|
||||
@@ -135,6 +135,33 @@ The `id` field is your workspace ID — remember it.
|
||||
|
||||
---
|
||||
|
||||
## Optional — one local MCP bridge, multiple tenants
|
||||
|
||||
If your local agent runtime uses `molecule-mcp`, one process can serve more
|
||||
than one external workspace:
|
||||
|
||||
```bash
|
||||
export MOLECULE_WORKSPACES='[
|
||||
{
|
||||
"id": "workspace-id-local-to-you-org",
|
||||
"token": "...",
|
||||
"platform_url": "https://you.moleculesai.app"
|
||||
},
|
||||
{
|
||||
"id": "different-workspace-id-local-to-team-org",
|
||||
"token": "...",
|
||||
"platform_url": "https://team.moleculesai.app"
|
||||
}
|
||||
]'
|
||||
molecule-mcp
|
||||
```
|
||||
|
||||
Use the workspace ID and token returned by each tenant. The IDs may differ
|
||||
across orgs. `org_id` is not required here because `platform_url` selects the
|
||||
tenant and the token is tenant-scoped.
|
||||
|
||||
---
|
||||
|
||||
## Step 4 — Chat with it
|
||||
|
||||
1. Open your Molecule canvas at `https://<TENANT>`
|
||||
|
||||
@@ -125,6 +125,33 @@ The agent appears on the canvas with a **purple REMOTE badge** within seconds. F
|
||||
|
||||
---
|
||||
|
||||
## Multi-Tenant Local MCP Bridge
|
||||
|
||||
For local MCP-driven agents, use the standalone runtime's `molecule-mcp`
|
||||
entrypoint. A single local bridge can serve multiple external workspaces by
|
||||
setting `MOLECULE_WORKSPACES`:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"id": "workspace-id-local-to-acme",
|
||||
"token": "...",
|
||||
"platform_url": "https://acme.moleculesai.app"
|
||||
},
|
||||
{
|
||||
"id": "different-workspace-id-local-to-ops",
|
||||
"token": "...",
|
||||
"platform_url": "https://ops.moleculesai.app"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
`platform_url` selects the tenant for registration, heartbeat, inbox polling,
|
||||
and outbound A2A routing. `org_id` is not required in this config, and the
|
||||
workspace IDs do not need to match across tenants.
|
||||
|
||||
---
|
||||
|
||||
## What Phase 30 Covers
|
||||
|
||||
| Phase | What shipped | Endpoint |
|
||||
|
||||
+1
-1
@@ -36,7 +36,7 @@ e2e_mint_test_token() {
|
||||
local admin_bearer="${MOLECULE_ADMIN_TOKEN:-${ADMIN_TOKEN:-}}"
|
||||
local admin_auth=()
|
||||
[ -n "$admin_bearer" ] && admin_auth=(-H "Authorization: Bearer $admin_bearer")
|
||||
body=$(curl -s -w "\n%{http_code}" "$BASE/admin/workspaces/$wid/test-token" "${admin_auth[@]}")
|
||||
body=$(curl -s -w "\n%{http_code}" "$BASE/admin/workspaces/$wid/test-token" ${admin_auth[@]+"${admin_auth[@]}"})
|
||||
local code
|
||||
code=$(printf '%s' "$body" | tail -n1)
|
||||
local json
|
||||
|
||||
Executable
+116
@@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# EC2 leak check for staging E2E harnesses.
|
||||
#
|
||||
# Modes:
|
||||
# E2E_AWS_LEAK_CHECK=off skip
|
||||
# E2E_AWS_LEAK_CHECK=auto check only when aws + credentials exist
|
||||
# E2E_AWS_LEAK_CHECK=required fail if aws + credentials are unavailable
|
||||
#
|
||||
# Optional:
|
||||
# E2E_AWS_LEAK_CHECK_SECS poll budget, default 90
|
||||
# E2E_AWS_LEAK_CHECK_INTERVAL poll interval, default 10
|
||||
# E2E_AWS_TERMINATE_LEAKS=1 terminate matching leaked instances
|
||||
|
||||
e2e_aws_leak_mode() {
|
||||
echo "${E2E_AWS_LEAK_CHECK:-auto}"
|
||||
}
|
||||
|
||||
e2e_aws_region() {
|
||||
echo "${E2E_AWS_REGION:-${AWS_REGION:-${AWS_DEFAULT_REGION:-us-east-2}}}"
|
||||
}
|
||||
|
||||
e2e_aws_creds_available() {
|
||||
command -v aws >/dev/null 2>&1 || return 1
|
||||
[ -n "${AWS_ACCESS_KEY_ID:-}" ] || return 1
|
||||
[ -n "${AWS_SECRET_ACCESS_KEY:-}" ] || return 1
|
||||
}
|
||||
|
||||
e2e_ec2_instances_for_slug() {
|
||||
local slug="$1"
|
||||
local region
|
||||
region=$(e2e_aws_region)
|
||||
|
||||
# shellcheck disable=SC2016
|
||||
aws ec2 describe-instances \
|
||||
--region "$region" \
|
||||
--filters "Name=tag:Name,Values=*$slug*" \
|
||||
"Name=instance-state-name,Values=pending,running,stopping,stopped" \
|
||||
--query 'Reservations[].Instances[].[InstanceId,State.Name,Tags[?Key==`Name`].Value|[0]]' \
|
||||
--output text
|
||||
}
|
||||
|
||||
e2e_terminate_instances() {
|
||||
local ids="$1"
|
||||
local region
|
||||
region=$(e2e_aws_region)
|
||||
|
||||
[ -n "$ids" ] || return 0
|
||||
# shellcheck disable=SC2086
|
||||
aws ec2 terminate-instances --region "$region" --instance-ids $ids >/dev/null
|
||||
}
|
||||
|
||||
e2e_verify_no_ec2_leaks_for_slug() {
|
||||
local slug="$1"
|
||||
local mode
|
||||
local max_secs
|
||||
local interval
|
||||
local elapsed=0
|
||||
local rows=""
|
||||
local ids=""
|
||||
|
||||
mode=$(e2e_aws_leak_mode)
|
||||
case "$mode" in
|
||||
off)
|
||||
echo "[aws-leak-check] skipped: E2E_AWS_LEAK_CHECK=off" >&2
|
||||
return 0
|
||||
;;
|
||||
auto|required) ;;
|
||||
*)
|
||||
echo "[aws-leak-check] invalid E2E_AWS_LEAK_CHECK=$mode (expected off|auto|required)" >&2
|
||||
return 2
|
||||
;;
|
||||
esac
|
||||
|
||||
if ! e2e_aws_creds_available; then
|
||||
if [ "$mode" = "required" ]; then
|
||||
echo "[aws-leak-check] required but aws CLI or AWS credentials are unavailable" >&2
|
||||
return 2
|
||||
fi
|
||||
echo "[aws-leak-check] skipped: aws CLI or AWS credentials unavailable" >&2
|
||||
return 0
|
||||
fi
|
||||
|
||||
max_secs="${E2E_AWS_LEAK_CHECK_SECS:-90}"
|
||||
interval="${E2E_AWS_LEAK_CHECK_INTERVAL:-10}"
|
||||
|
||||
while true; do
|
||||
rows=$(e2e_ec2_instances_for_slug "$slug" 2>&1) || {
|
||||
echo "[aws-leak-check] aws ec2 describe-instances failed for slug=$slug" >&2
|
||||
echo "$rows" >&2
|
||||
return 2
|
||||
}
|
||||
|
||||
if [ -z "$rows" ] || [ "$rows" = "None" ]; then
|
||||
echo "[aws-leak-check] no live EC2 instances for slug=$slug" >&2
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [ "$elapsed" -ge "$max_secs" ]; then
|
||||
echo "[aws-leak-check] leaked EC2 instance(s) for slug=$slug after ${elapsed}s:" >&2
|
||||
echo "$rows" >&2
|
||||
if [ "${E2E_AWS_TERMINATE_LEAKS:-0}" = "1" ]; then
|
||||
ids=$(echo "$rows" | awk 'NF {print $1}' | sort -u | tr '\n' ' ')
|
||||
echo "[aws-leak-check] terminating leaked EC2 instance(s): $ids" >&2
|
||||
e2e_terminate_instances "$ids" || {
|
||||
echo "[aws-leak-check] terminate-instances failed for: $ids" >&2
|
||||
return 4
|
||||
}
|
||||
fi
|
||||
return 4
|
||||
fi
|
||||
|
||||
sleep "$interval"
|
||||
elapsed=$((elapsed + interval))
|
||||
done
|
||||
}
|
||||
@@ -19,11 +19,18 @@
|
||||
# PR #2558+#2563+#2567 cleared the
|
||||
# masking layers.)
|
||||
#
|
||||
# claude-code → "sonnet" (entry-id form: claude-code template's
|
||||
# config.yaml uses bare model names,
|
||||
# auth comes via CLAUDE_CODE_OAUTH_TOKEN
|
||||
# or ANTHROPIC_API_KEY rather than the
|
||||
# slug.)
|
||||
# claude-code → auth-aware:
|
||||
# E2E_MINIMAX_API_KEY → "MiniMax-M2"
|
||||
# E2E_ANTHROPIC_API_KEY → "claude-sonnet-4-6"
|
||||
# otherwise → "sonnet"
|
||||
#
|
||||
# claude-code provider routing is model-driven. The bare
|
||||
# "sonnet" alias selects the OAuth provider, so it is only a
|
||||
# good default when the canary is using Claude Code OAuth or
|
||||
# intentionally exercising the missing-auth path. MiniMax and
|
||||
# direct Anthropic API keys need model IDs that resolve to
|
||||
# their provider entries, otherwise the workspace boots
|
||||
# reachable but the first A2A call hits the wrong auth path.
|
||||
#
|
||||
# When E2E_MODEL_SLUG is set, it overrides this dispatch — useful when an
|
||||
# operator dispatches the workflow to test a specific slug.
|
||||
@@ -45,7 +52,15 @@ pick_model_slug() {
|
||||
case "$runtime" in
|
||||
hermes) printf 'openai/gpt-4o' ;;
|
||||
langgraph) printf 'openai:gpt-4o' ;;
|
||||
claude-code) printf 'sonnet' ;;
|
||||
claude-code)
|
||||
if [ -n "${E2E_MINIMAX_API_KEY:-}" ]; then
|
||||
printf 'MiniMax-M2'
|
||||
elif [ -n "${E2E_ANTHROPIC_API_KEY:-}" ]; then
|
||||
printf 'claude-sonnet-4-6'
|
||||
else
|
||||
printf 'sonnet'
|
||||
fi
|
||||
;;
|
||||
*) printf 'openai/gpt-4o' ;; # safest fallback (matches hermes)
|
||||
esac
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ pv_assert_runtime() {
|
||||
set +e
|
||||
resp=$(curl -sS -X POST "$base_url/workspaces/$wid/mcp" \
|
||||
-H "Authorization: Bearer $wtok" \
|
||||
"${org_header[@]}" \
|
||||
${org_header[@]+"${org_header[@]}"} \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$PV_RPC_BODY" \
|
||||
-o /tmp/pv_mcp_body.json -w "%{http_code}" 2>/dev/null)
|
||||
|
||||
Executable
+109
@@ -0,0 +1,109 @@
|
||||
#!/usr/bin/env bash
|
||||
set -uo pipefail
|
||||
|
||||
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck disable=SC1091
|
||||
# shellcheck source=lib/aws_leak_check.sh
|
||||
source "$SCRIPT_DIR/lib/aws_leak_check.sh"
|
||||
|
||||
PASS=0
|
||||
FAIL=0
|
||||
|
||||
TMPDIR_E2E=$(mktemp -d -t aws-leak-check-e2e-XXXXXX)
|
||||
trap 'rm -rf "$TMPDIR_E2E"' EXIT INT TERM
|
||||
|
||||
make_fake_aws() {
|
||||
local body="$1"
|
||||
mkdir -p "$TMPDIR_E2E/bin"
|
||||
cat > "$TMPDIR_E2E/bin/aws" <<EOF
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
echo "\$*" >> "$TMPDIR_E2E/aws.calls"
|
||||
$body
|
||||
EOF
|
||||
chmod +x "$TMPDIR_E2E/bin/aws"
|
||||
}
|
||||
|
||||
reset_env() {
|
||||
/bin/rm -f "$TMPDIR_E2E/aws.calls"
|
||||
export PATH="$TMPDIR_E2E/bin:$ORIG_PATH"
|
||||
export AWS_ACCESS_KEY_ID=test-access
|
||||
export AWS_SECRET_ACCESS_KEY=test-secret
|
||||
export AWS_DEFAULT_REGION=us-east-2
|
||||
export E2E_AWS_LEAK_CHECK=required
|
||||
export E2E_AWS_LEAK_CHECK_SECS=0
|
||||
export E2E_AWS_LEAK_CHECK_INTERVAL=1
|
||||
unset E2E_AWS_TERMINATE_LEAKS
|
||||
}
|
||||
|
||||
assert_rc() {
|
||||
local label="$1"
|
||||
local expected="$2"
|
||||
shift 2
|
||||
local observed
|
||||
"$@" >/tmp/aws-leak-check.out 2>/tmp/aws-leak-check.err
|
||||
observed=$?
|
||||
if [ "$observed" = "$expected" ]; then
|
||||
echo " PASS $label"
|
||||
PASS=$((PASS + 1))
|
||||
else
|
||||
echo " FAIL $label: expected rc=$expected observed=$observed" >&2
|
||||
echo " stderr:" >&2
|
||||
sed 's/^/ /' /tmp/aws-leak-check.err >&2
|
||||
FAIL=$((FAIL + 1))
|
||||
fi
|
||||
}
|
||||
|
||||
ORIG_PATH="$PATH"
|
||||
|
||||
echo "Test: AWS EC2 leak check helper"
|
||||
|
||||
reset_env
|
||||
/bin/rm -rf "${TMPDIR_E2E:?}/bin"
|
||||
/bin/mkdir -p "$TMPDIR_E2E/noaws"
|
||||
export PATH="$TMPDIR_E2E/noaws"
|
||||
export E2E_AWS_LEAK_CHECK=auto
|
||||
assert_rc "auto mode skips when aws is unavailable" 0 e2e_verify_no_ec2_leaks_for_slug e2e-smoke-test
|
||||
|
||||
reset_env
|
||||
/bin/rm -rf "${TMPDIR_E2E:?}/bin"
|
||||
/bin/mkdir -p "$TMPDIR_E2E/noaws"
|
||||
export PATH="$TMPDIR_E2E/noaws"
|
||||
export E2E_AWS_LEAK_CHECK=required
|
||||
assert_rc "required mode fails when aws is unavailable" 2 e2e_verify_no_ec2_leaks_for_slug e2e-smoke-test
|
||||
|
||||
reset_env
|
||||
# shellcheck disable=SC2016
|
||||
make_fake_aws 'if [ "$1 $2" = "ec2 describe-instances" ]; then exit 0; fi'
|
||||
assert_rc "no matching EC2 returns clean" 0 e2e_verify_no_ec2_leaks_for_slug e2e-smoke-test
|
||||
|
||||
reset_env
|
||||
# shellcheck disable=SC2016
|
||||
make_fake_aws 'if [ "$1 $2" = "ec2 describe-instances" ]; then echo "i-123 running ws-tenant-e2e-smoke-test-abc"; exit 0; fi'
|
||||
assert_rc "persistent matching EC2 is a leak" 4 e2e_verify_no_ec2_leaks_for_slug e2e-smoke-test
|
||||
|
||||
reset_env
|
||||
export E2E_AWS_TERMINATE_LEAKS=1
|
||||
# shellcheck disable=SC2016
|
||||
make_fake_aws '
|
||||
if [ "$1 $2" = "ec2 describe-instances" ]; then
|
||||
echo "i-123 running ws-tenant-e2e-smoke-test-abc"
|
||||
exit 0
|
||||
fi
|
||||
if [ "$1 $2" = "ec2 terminate-instances" ]; then
|
||||
echo "terminated" >/dev/null
|
||||
exit 0
|
||||
fi
|
||||
'
|
||||
assert_rc "terminate mode attempts cleanup before returning leak" 4 e2e_verify_no_ec2_leaks_for_slug e2e-smoke-test
|
||||
if grep -q "terminate-instances" "$TMPDIR_E2E/aws.calls"; then
|
||||
echo " PASS terminate-instances was called"
|
||||
PASS=$((PASS + 1))
|
||||
else
|
||||
echo " FAIL terminate-instances was not called" >&2
|
||||
FAIL=$((FAIL + 1))
|
||||
fi
|
||||
|
||||
echo
|
||||
echo "passed=$PASS failed=$FAIL"
|
||||
[ "$FAIL" = "0" ]
|
||||
@@ -16,7 +16,7 @@ set -uo pipefail
|
||||
# Resolve to the lib relative to this test file so the test runs from
|
||||
# any cwd (CI, local invocation, repo root).
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
# shellcheck source=lib/model_slug.sh
|
||||
# shellcheck source=tests/e2e/lib/model_slug.sh
|
||||
source "$SCRIPT_DIR/lib/model_slug.sh"
|
||||
|
||||
PASS=0
|
||||
@@ -48,7 +48,16 @@ echo
|
||||
# ── Per-runtime branches (the load-bearing ones for synth-E2E) ──
|
||||
run_test "hermes → slash-form (derive-provider.sh contract)" hermes "openai/gpt-4o"
|
||||
run_test "langgraph → colon-form (init_chat_model contract)" langgraph "openai:gpt-4o"
|
||||
run_test "claude-code → bare model name (entry-id form)" claude-code "sonnet"
|
||||
run_test "claude-code → OAuth/default alias" claude-code "sonnet"
|
||||
|
||||
got=$(unset E2E_MODEL_SLUG E2E_ANTHROPIC_API_KEY; E2E_MINIMAX_API_KEY="mx-test" pick_model_slug claude-code)
|
||||
assert_eq "claude-code + MiniMax key → MiniMax model" "$got" "MiniMax-M2"
|
||||
|
||||
got=$(unset E2E_MODEL_SLUG E2E_MINIMAX_API_KEY; E2E_ANTHROPIC_API_KEY="sk-ant-test" pick_model_slug claude-code)
|
||||
assert_eq "claude-code + Anthropic API key → Anthropic API model" "$got" "claude-sonnet-4-6"
|
||||
|
||||
got=$(unset E2E_MODEL_SLUG; E2E_MINIMAX_API_KEY="mx-priority" E2E_ANTHROPIC_API_KEY="sk-ant-loser" pick_model_slug claude-code)
|
||||
assert_eq "claude-code + both keys → MiniMax priority" "$got" "MiniMax-M2"
|
||||
|
||||
# ── Fallback for unknown runtime ──
|
||||
# Picks slash-form (hermes-shaped) since hermes is the historical
|
||||
|
||||
@@ -24,7 +24,8 @@
|
||||
#
|
||||
# Only PROVISIONING differs from staging:
|
||||
# - staging: POST /cp/admin/orgs (cold EC2 tenant) + per-tenant admin
|
||||
# token + each workspace's auth_token from the POST /workspaces resp.
|
||||
# token + each workspace's MCP bearer from create response or an admin
|
||||
# token-mint fallback.
|
||||
# - local: POST /workspaces directly against the local stack
|
||||
# (BASE, default http://localhost:8080), MCP bearer minted via
|
||||
# GET /admin/workspaces/:id/test-token (e2e_mint_test_token —
|
||||
@@ -103,7 +104,7 @@ teardown() {
|
||||
log "[teardown] deleting ${#CREATED_WSIDS[@]} workspace(s) this run created (scoped)"
|
||||
for wid in ${CREATED_WSIDS[@]+"${CREATED_WSIDS[@]}"}; do
|
||||
[ -n "$wid" ] || continue
|
||||
curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" "${ADMIN_AUTH[@]}" >/dev/null 2>&1 || true
|
||||
curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} >/dev/null 2>&1 || true
|
||||
done
|
||||
exit $rc
|
||||
}
|
||||
@@ -112,7 +113,7 @@ trap teardown EXIT INT TERM
|
||||
# Pre-sweep workspaces a prior crashed run of THIS script left behind
|
||||
# (name prefix match only — never a blanket delete). The trap fires on
|
||||
# normal exit, but a kill -9 / SIGPIPE can bypass it.
|
||||
PRIOR=$(curl -s "$BASE/workspaces" "${ADMIN_AUTH[@]}" | python3 -c '
|
||||
PRIOR=$(curl -s "$BASE/workspaces" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} | python3 -c '
|
||||
import json, sys
|
||||
try:
|
||||
print(" ".join(w["id"] for w in json.load(sys.stdin) if w.get("name","").startswith("PV-Local-")))
|
||||
@@ -121,7 +122,7 @@ except Exception:
|
||||
' 2>/dev/null)
|
||||
for _wid in $PRIOR; do
|
||||
log "Pre-sweeping prior PV-Local workspace: $_wid"
|
||||
curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" "${ADMIN_AUTH[@]}" >/dev/null 2>&1 || true
|
||||
curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} >/dev/null 2>&1 || true
|
||||
done
|
||||
|
||||
# ─── Local-stack preflight ─────────────────────────────────────────────
|
||||
@@ -132,10 +133,10 @@ if ! curl -fsS "$BASE/health" -m 5 >/dev/null 2>&1; then
|
||||
fi
|
||||
# admin/test-token is the local MCP-bearer mint path; it 404s in
|
||||
# production. If it is off, this gate cannot drive the literal call.
|
||||
if ! curl -fsS "$BASE/admin/workspaces/preflight-probe/test-token" "${ADMIN_AUTH[@]}" -m 5 >/dev/null 2>&1; then
|
||||
if ! curl -fsS "$BASE/admin/workspaces/preflight-probe/test-token" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -m 5 >/dev/null 2>&1; then
|
||||
# A 404 here is EITHER "no such ws" (fine — endpoint is enabled) OR the
|
||||
# endpoint is disabled (MOLECULE_ENV=production). Distinguish by body.
|
||||
PROBE=$(curl -s "$BASE/admin/workspaces/preflight-probe/test-token" "${ADMIN_AUTH[@]}" -m 5 2>/dev/null)
|
||||
PROBE=$(curl -s "$BASE/admin/workspaces/preflight-probe/test-token" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -m 5 2>/dev/null)
|
||||
if echo "$PROBE" | grep -qi 'production\|disabled\|not found.*endpoint'; then
|
||||
echo "::error::GET /admin/workspaces/:id/test-token disabled (MOLECULE_ENV=production?). Cannot mint a local MCP bearer." >&2
|
||||
exit 1
|
||||
@@ -240,7 +241,7 @@ else
|
||||
fi
|
||||
log "1/5 provisioning parent ($PARENT_RUNTIME, mode=$PV_LOCAL_PROVISION_MODE) + one sibling per runtime under test..."
|
||||
|
||||
P_RESP=$(curl -s -X POST "$BASE/workspaces" "${ADMIN_AUTH[@]}" -H "Content-Type: application/json" \
|
||||
P_RESP=$(curl -s -X POST "$BASE/workspaces" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -H "Content-Type: application/json" \
|
||||
-d "{\"name\":\"${NAME_PREFIX}-parent\",\"runtime\":\"$PARENT_RUNTIME\",\"tier\":3$PARENT_EXTRA,\"secrets\":$PARENT_SECRETS}")
|
||||
PARENT_ID=$(echo "$P_RESP" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("id",""))' 2>/dev/null)
|
||||
if [ -z "$PARENT_ID" ]; then
|
||||
@@ -290,7 +291,7 @@ for rt in $PV_RUNTIMES; do
|
||||
CREATE_RUNTIME="$rt"
|
||||
CREATE_EXTRA=""
|
||||
fi
|
||||
R=$(curl -s -X POST "$BASE/workspaces" "${ADMIN_AUTH[@]}" -H "Content-Type: application/json" \
|
||||
R=$(curl -s -X POST "$BASE/workspaces" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -H "Content-Type: application/json" \
|
||||
-d "{\"name\":\"${NAME_PREFIX}-$rt\",\"runtime\":\"$CREATE_RUNTIME\",\"tier\":2,\"parent_id\":\"$PARENT_ID\"$CREATE_EXTRA,\"secrets\":$SEC}")
|
||||
WID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("id",""))' 2>/dev/null)
|
||||
if [ -z "$WID" ]; then
|
||||
|
||||
@@ -40,8 +40,10 @@
|
||||
# drives: POST /cp/admin/orgs (provision), GET
|
||||
# /cp/admin/orgs/:slug/admin-token (per-tenant token), DELETE
|
||||
# /cp/admin/tenants/:slug (teardown). The per-tenant admin token drives
|
||||
# tenant workspace creation; each workspace's OWN auth_token (returned by
|
||||
# POST /workspaces) drives its MCP call.
|
||||
# tenant workspace creation; each workspace's OWN auth_token drives its
|
||||
# MCP call. External-like runtimes may return the token in POST
|
||||
# /workspaces; managed container runtimes usually require the admin token
|
||||
# mint fallback below.
|
||||
#
|
||||
# Required env:
|
||||
# MOLECULE_ADMIN_TOKEN CP admin bearer — Railway staging CP_ADMIN_API_TOKEN
|
||||
@@ -52,6 +54,9 @@
|
||||
# E2E_PROVISION_TIMEOUT_SECS default 1800 (hermes/openclaw cold EC2 budget)
|
||||
# E2E_MINIMAX_API_KEY / E2E_ANTHROPIC_API_KEY / E2E_OPENAI_API_KEY
|
||||
# LLM provider key injected so the runtime can boot
|
||||
# PV_TOKEN_DIAGNOSTIC_ONLY
|
||||
# 1 -> stop after create/token acquisition. Useful
|
||||
# to classify Hermes-only vs shared auth-route issues.
|
||||
# E2E_KEEP_ORG 1 → skip teardown (local debugging only)
|
||||
#
|
||||
# Exit codes:
|
||||
@@ -104,6 +109,46 @@ tenant_call() {
|
||||
-H "Content-Type: application/json" "$@"
|
||||
}
|
||||
|
||||
tenant_call_capture() {
|
||||
local method="$1" path="$2" out="$3"; shift 3
|
||||
curl -sS -o "$out" -w "%{http_code}" -X "$method" "$TENANT_URL$path" \
|
||||
-H "Authorization: Bearer $TENANT_TOKEN" \
|
||||
-H "X-Molecule-Org-Id: $ORG_ID" \
|
||||
-H "Content-Type: application/json" "$@"
|
||||
}
|
||||
|
||||
redact_token_body() {
|
||||
python3 -c '
|
||||
import json, re, sys
|
||||
raw = sys.stdin.read()
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except Exception:
|
||||
print(re.sub(r"(?i)([a-z0-9_]*token)=([^&\\s]+)", r"\1=<redacted>", raw)[:500])
|
||||
raise SystemExit(0)
|
||||
|
||||
def scrub(v):
|
||||
if isinstance(v, dict):
|
||||
return {k: ("<redacted>" if "token" in k.lower() else scrub(val)) for k, val in v.items()}
|
||||
if isinstance(v, list):
|
||||
return [scrub(x) for x in v]
|
||||
return v
|
||||
|
||||
print(json.dumps(scrub(data), separators=(",", ":"))[:500])
|
||||
'
|
||||
}
|
||||
|
||||
extract_auth_token() {
|
||||
python3 -c "
|
||||
import sys, json
|
||||
try:
|
||||
d = json.load(sys.stdin)
|
||||
except Exception:
|
||||
print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or d.get('connection', {}).get('auth_token') or '')
|
||||
" 2>/dev/null
|
||||
}
|
||||
|
||||
# ─── Scoped teardown ───────────────────────────────────────────────────
|
||||
# Deletes ONLY the org this run created (DELETE /cp/admin/tenants/$SLUG
|
||||
# with the {"confirm":$SLUG} fat-finger guard). Never a cluster-wide
|
||||
@@ -190,6 +235,12 @@ for i in $(seq 1 120); do
|
||||
curl -fsS "$TENANT_URL/health" -m 5 -k >/dev/null 2>&1 && { log " /health ok (attempt $i)"; break; }
|
||||
sleep 5
|
||||
done
|
||||
BUILDINFO=$(curl -fsS "$TENANT_URL/buildinfo" -m 10 2>/dev/null || true)
|
||||
if [ -n "$BUILDINFO" ]; then
|
||||
log " tenant buildinfo: $(echo "$BUILDINFO" | head -c 300)"
|
||||
else
|
||||
log " tenant buildinfo unavailable"
|
||||
fi
|
||||
|
||||
# ─── 4. Provision the parent + one sibling per runtime under test ──────
|
||||
# Inject the LLM provider key so each runtime can authenticate at boot.
|
||||
@@ -214,35 +265,49 @@ log " PARENT_ID=$PARENT_ID"
|
||||
# WS_IDS[runtime]=id ; WS_TOKENS[runtime]=auth_token (the MCP bearer)
|
||||
declare -A WS_IDS WS_TOKENS
|
||||
ALL_WS_IDS="$PARENT_ID"
|
||||
TOKEN_ERRORS=0
|
||||
TOKEN_ERROR_SUMMARY=""
|
||||
for rt in $PV_RUNTIMES; do
|
||||
R=$(tenant_call POST /workspaces \
|
||||
-d "{\"name\":\"pv-$rt\",\"runtime\":\"$rt\",\"tier\":2,\"parent_id\":\"$PARENT_ID\",\"secrets\":$SECRETS_JSON}")
|
||||
WID=$(echo "$R" | python3 -c "import sys,json; print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
|
||||
# auth_token is top-level for container runtimes; external-like nest it
|
||||
# under connection.auth_token (verified vs staging response shape).
|
||||
WTOK=$(echo "$R" | python3 -c "
|
||||
import sys, json
|
||||
try: d = json.load(sys.stdin)
|
||||
except Exception: print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or d.get('connection', {}).get('auth_token') or '')
|
||||
" 2>/dev/null)
|
||||
# External-like runtimes may return connection.auth_token on create.
|
||||
# Managed container runtimes usually return only id/status here, then
|
||||
# receive their bearer through registry/bootstrap; for this literal MCP
|
||||
# driver we mint through the production-safe admin token route below.
|
||||
WTOK=$(echo "$R" | extract_auth_token)
|
||||
[ -n "$WID" ] || fail "$rt workspace create failed: $(echo "$R" | head -c 300)"
|
||||
TOKEN_DIAG=""
|
||||
if [ -z "$WTOK" ]; then
|
||||
TTOK_RESP=$(tenant_call GET "/admin/workspaces/$WID/test-token" 2>/dev/null || true)
|
||||
WTOK=$(echo "$TTOK_RESP" | python3 -c "
|
||||
import sys, json
|
||||
try: d = json.load(sys.stdin)
|
||||
except Exception: print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or '')
|
||||
" 2>/dev/null)
|
||||
TTOK_FILE=$(mktemp)
|
||||
TTOK_CODE=$(tenant_call_capture POST "/admin/workspaces/$WID/tokens" "$TTOK_FILE" 2>/dev/null || echo "curl_error")
|
||||
TTOK_RESP=$(cat "$TTOK_FILE" 2>/dev/null || true)
|
||||
WTOK=$(echo "$TTOK_RESP" | extract_auth_token)
|
||||
TOKEN_DIAG="POST /admin/workspaces/$WID/tokens -> HTTP $TTOK_CODE body: $(echo "$TTOK_RESP" | redact_token_body)"
|
||||
rm -f "$TTOK_FILE"
|
||||
fi
|
||||
[ -n "$WTOK" ] || fail "$rt workspace did not return or mint an auth_token — cannot drive its MCP call (resp: $(echo "$R" | head -c 300))"
|
||||
WS_IDS[$rt]="$WID"
|
||||
if [ -z "$WTOK" ]; then
|
||||
TOKEN_ERRORS=$((TOKEN_ERRORS + 1))
|
||||
TOKEN_ERROR_SUMMARY="${TOKEN_ERROR_SUMMARY}
|
||||
[$rt] workspace did not return or mint an auth_token — cannot drive its MCP call (workspace_id=$WID; create_resp: $(echo "$R" | redact_token_body); token_fallbacks: $TOKEN_DIAG)"
|
||||
log " $rt → $WID (token acquisition failed; continuing to classify other runtimes)"
|
||||
continue
|
||||
fi
|
||||
WS_TOKENS[$rt]="$WTOK"
|
||||
ALL_WS_IDS="$ALL_WS_IDS $WID"
|
||||
log " $rt → $WID"
|
||||
done
|
||||
|
||||
if [ "$TOKEN_ERRORS" -gt 0 ]; then
|
||||
fail "token acquisition failed for $TOKEN_ERRORS runtime(s):$TOKEN_ERROR_SUMMARY"
|
||||
fi
|
||||
|
||||
if [ "${PV_TOKEN_DIAGNOSTIC_ONLY:-0}" = "1" ]; then
|
||||
ok "token diagnostic passed for runtimes: $PV_RUNTIMES"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# ─── 5. Wait for every sibling online ──────────────────────────────────
|
||||
log "5/6 waiting for all workspaces status=online (up to ${PROVISION_TIMEOUT_SECS}s — cold boot)..."
|
||||
WS_DEADLINE=$(( $(date +%s) + PROVISION_TIMEOUT_SECS ))
|
||||
|
||||
+22
@@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env bash
|
||||
# Staging E2E diagnostic — classify peer-visibility token acquisition.
|
||||
#
|
||||
# This is intentionally narrower than test_peer_visibility_mcp_staging.sh:
|
||||
# it provisions the same throwaway org, creates managed sibling workspaces,
|
||||
# and stops immediately after auth_token acquisition. The default runtime set
|
||||
# compares hermes with claude-code so a failure is easy to classify:
|
||||
# - hermes fails, claude-code passes -> Hermes/runtime-specific
|
||||
# - both fail -> shared admin/auth/proxy route
|
||||
#
|
||||
# Required env matches test_peer_visibility_mcp_staging.sh:
|
||||
# MOLECULE_ADMIN_TOKEN
|
||||
# Optional:
|
||||
# MOLECULE_CP_URL, E2E_RUN_ID, PV_RUNTIMES, E2E_KEEP_ORG,
|
||||
# E2E_MINIMAX_API_KEY / E2E_ANTHROPIC_API_KEY / E2E_OPENAI_API_KEY
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
export PV_RUNTIMES="${PV_RUNTIMES:-hermes claude-code}"
|
||||
export PV_TOKEN_DIAGNOSTIC_ONLY=1
|
||||
|
||||
exec "$(dirname "${BASH_SOURCE[0]}")/test_peer_visibility_mcp_staging.sh"
|
||||
@@ -25,6 +25,11 @@
|
||||
# Optional env:
|
||||
# E2E_RUNTIME hermes (default) | claude-code | langgraph
|
||||
# E2E_PROVISION_TIMEOUT_SECS default 900 (15 min cold EC2 budget)
|
||||
# E2E_WORKSPACE_ONLINE_TIMEOUT_SECS default 3600 (60 min — hermes
|
||||
# cold-boot worst-case + slack). Raised from
|
||||
# 1800 (#1646) because flaky tenant-provisioning
|
||||
# latency (not a code regression) causes
|
||||
# alternating pass/fail on identical SHAs.
|
||||
# E2E_KEEP_ORG 1 → skip teardown (debugging only)
|
||||
# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID}
|
||||
# E2E_MODE full (default) | smoke
|
||||
@@ -32,6 +37,11 @@
|
||||
# mapped to `smoke` for back-compat with
|
||||
# any in-flight runner picking up an older
|
||||
# workflow checkout)
|
||||
# E2E_AWS_LEAK_CHECK auto (default) | required | off
|
||||
# required in CI so teardown cannot report
|
||||
# clean while slug-tagged EC2 remains alive
|
||||
# E2E_AWS_TERMINATE_LEAKS 1 → terminate slug-tagged leaked EC2 before
|
||||
# exiting 4
|
||||
# E2E_INTENTIONAL_FAILURE 1 → poison tenant token mid-run so the
|
||||
# script fails; the EXIT trap MUST still
|
||||
# tear down cleanly (and exit 4 on leak).
|
||||
@@ -51,6 +61,7 @@ CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
|
||||
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLECULE_ADMIN_TOKEN required — Railway staging CP_ADMIN_API_TOKEN}"
|
||||
RUNTIME="${E2E_RUNTIME:-hermes}"
|
||||
PROVISION_TIMEOUT_SECS="${E2E_PROVISION_TIMEOUT_SECS:-900}"
|
||||
WORKSPACE_ONLINE_TIMEOUT_SECS="${E2E_WORKSPACE_ONLINE_TIMEOUT_SECS:-3600}"
|
||||
RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}"
|
||||
MODE="${E2E_MODE:-full}"
|
||||
# `canary` is a legacy alias for `smoke` retained for back-compat with
|
||||
@@ -82,8 +93,12 @@ ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; }
|
||||
# Per-runtime model slug dispatch — see lib/model_slug.sh for the rationale.
|
||||
# Extracted so unit tests (tests/e2e/test_model_slug.sh) can pin every branch
|
||||
# without booting the full 11-step lifecycle.
|
||||
# shellcheck disable=SC1091
|
||||
# shellcheck source=lib/model_slug.sh
|
||||
source "$(dirname "$0")/lib/model_slug.sh"
|
||||
# shellcheck disable=SC1091
|
||||
# shellcheck source=lib/aws_leak_check.sh
|
||||
source "$(dirname "$0")/lib/aws_leak_check.sh"
|
||||
|
||||
CURL_COMMON=(-sS --fail-with-body --max-time 30)
|
||||
|
||||
@@ -119,12 +134,14 @@ cleanup_org() {
|
||||
# DELETE returns 5xx mid-cascade and the cascade finishes anyway,
|
||||
# and the case where DELETE legitimately exceeds 120s and we want
|
||||
# eventual-consistency confirmation.
|
||||
curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \
|
||||
if curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \
|
||||
&& ok "Teardown request accepted" \
|
||||
|| log "Teardown returned non-2xx (may already be gone)"
|
||||
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1; then
|
||||
ok "Teardown request accepted"
|
||||
else
|
||||
log "Teardown returned non-2xx (may already be gone)"
|
||||
fi
|
||||
|
||||
local leak_count=1
|
||||
local elapsed=0
|
||||
@@ -144,7 +161,15 @@ cleanup_org() {
|
||||
echo "⚠️ LEAK: org $SLUG still present post-teardown after ${elapsed}s (count=$leak_count)" >&2
|
||||
exit 4
|
||||
fi
|
||||
ok "Teardown clean — no orphan resources for $SLUG (${elapsed}s)"
|
||||
local aws_leak_rc=0
|
||||
e2e_verify_no_ec2_leaks_for_slug "$SLUG" || aws_leak_rc=$?
|
||||
if [ "$aws_leak_rc" != "0" ]; then
|
||||
case "$aws_leak_rc" in
|
||||
2) exit 2 ;;
|
||||
*) exit 4 ;;
|
||||
esac
|
||||
fi
|
||||
ok "Teardown clean — no orphan org or EC2 resources for $SLUG (${elapsed}s)"
|
||||
|
||||
# Normalize unexpected upstream exit codes to 1 (generic failure). The
|
||||
# script's documented contract (header "Exit codes" section) only emits
|
||||
@@ -331,6 +356,75 @@ tenant_call() {
|
||||
"$@"
|
||||
}
|
||||
|
||||
sanitize_http_body() {
|
||||
python3 -c '
|
||||
import re, sys
|
||||
s = sys.stdin.read()
|
||||
s = re.sub(r"(?i)(Authorization:\s*Bearer\s+)[A-Za-z0-9._~+/=-]+", r"\1[redacted]", s)
|
||||
s = re.sub(r"(?i)(\"(?:auth_token|access_token|refresh_token|token|api_key|secret|password)\"\s*:\s*\")[^\"]+\"", r"\1[redacted]\"", s)
|
||||
s = re.sub(r"(?i)((?:auth_token|access_token|refresh_token|api_key|secret|password)=)[^&\s]+", r"\1[redacted]", s)
|
||||
print(s[:4000])
|
||||
'
|
||||
}
|
||||
|
||||
wait_workspaces_online_routable() {
|
||||
local label="$1"; shift
|
||||
local deadline=$(( $(date +%s) + WORKSPACE_ONLINE_TIMEOUT_SECS ))
|
||||
local wid ws_last_status ws_last_url ws_url_missing_logged ws_failed_logged
|
||||
local ws_json ws_status ws_url ws_last_err
|
||||
|
||||
log "$label"
|
||||
for wid in "$@"; do
|
||||
ws_last_status=""
|
||||
ws_last_url=""
|
||||
ws_url_missing_logged=0
|
||||
ws_failed_logged=0
|
||||
while true; do
|
||||
if [ "$(date +%s)" -gt "$deadline" ]; then
|
||||
ws_last_err=$(tenant_call GET "/workspaces/$wid" 2>/dev/null | \
|
||||
python3 -c "import json,sys; print(json.load(sys.stdin).get('last_sample_error',''))" 2>/dev/null || echo "")
|
||||
fail "Workspace $wid never reached online with a routable URL within ${WORKSPACE_ONLINE_TIMEOUT_SECS}s (~$((WORKSPACE_ONLINE_TIMEOUT_SECS/60)) min) (last status=$ws_last_status, url=$ws_last_url, err=$ws_last_err)"
|
||||
fi
|
||||
ws_json=$(tenant_call GET "/workspaces/$wid" 2>/dev/null || echo '{}')
|
||||
ws_status=$(echo "$ws_json" | python3 -c "import json,sys; print(json.load(sys.stdin).get('status') or '')" 2>/dev/null)
|
||||
ws_url=$(echo "$ws_json" | python3 -c "import json,sys; print(json.load(sys.stdin).get('url') or '')" 2>/dev/null)
|
||||
if [ "$ws_status" != "$ws_last_status" ]; then
|
||||
log " $wid → $ws_status"
|
||||
ws_last_status="$ws_status"
|
||||
fi
|
||||
if [ -n "$ws_url" ] && [ "$ws_url" != "$ws_last_url" ]; then
|
||||
log " $wid url ready: $ws_url"
|
||||
ws_last_url="$ws_url"
|
||||
fi
|
||||
case "$ws_status" in
|
||||
online)
|
||||
if [ -n "$ws_url" ]; then
|
||||
break
|
||||
fi
|
||||
if [ "$ws_url_missing_logged" = "0" ]; then
|
||||
log " $wid online but URL is not assigned yet — waiting for workspace routing readiness"
|
||||
ws_url_missing_logged=1
|
||||
fi
|
||||
sleep 10
|
||||
;;
|
||||
failed)
|
||||
# Not a hard fail — bootstrap-watcher frequently marks failed at
|
||||
# 5 min on hermes, then heartbeat recovers to online around 10-13
|
||||
# min when install.sh finishes. Log once per workspace so the CI
|
||||
# output isn't spammy.
|
||||
if [ "$ws_failed_logged" = "0" ]; then
|
||||
log " $wid transiently failed — waiting for heartbeat recovery (bootstrap-watcher deadline, see cp#245)"
|
||||
ws_failed_logged=1
|
||||
fi
|
||||
sleep 10
|
||||
;;
|
||||
*) sleep 10 ;;
|
||||
esac
|
||||
done
|
||||
ok " $wid online and routable"
|
||||
done
|
||||
}
|
||||
|
||||
# ─── 5. Provision parent workspace ─────────────────────────────────────
|
||||
# Inject the LLM provider key so the runtime can authenticate at boot.
|
||||
# Branch by which secret is set so the script supports multiple paths
|
||||
@@ -383,9 +477,9 @@ elif [ -n "${E2E_ANTHROPIC_API_KEY:-}" ]; then
|
||||
# is still independent of MOLECULE_STAGING_OPENAI_API_KEY, so an OpenAI
|
||||
# quota collapse doesn't wedge this path. Pinned to the claude-code
|
||||
# runtime: hermes/langgraph use OpenAI-shaped envs and won't honour
|
||||
# ANTHROPIC_API_KEY without further wiring (out of scope for this
|
||||
# branch; if you need a hermes/Anthropic path, dispatch with
|
||||
# E2E_RUNTIME=hermes + E2E_OPENAI_API_KEY pointing at a working key).
|
||||
# ANTHROPIC_API_KEY without further wiring. pick_model_slug maps this
|
||||
# branch to claude-sonnet-4-6 so the claude-code provider registry
|
||||
# selects anthropic-api instead of the OAuth-only sonnet alias.
|
||||
SECRETS_JSON=$(python3 -c "
|
||||
import json, os
|
||||
k = os.environ['E2E_ANTHROPIC_API_KEY']
|
||||
@@ -410,6 +504,7 @@ print(json.dumps({
|
||||
fi
|
||||
|
||||
MODEL_SLUG=$(pick_model_slug "$RUNTIME")
|
||||
log " MODEL_SLUG=$MODEL_SLUG"
|
||||
|
||||
log "5/11 Provisioning parent workspace (runtime=$RUNTIME)..."
|
||||
PARENT_RESP=$(tenant_call POST /workspaces \
|
||||
@@ -437,48 +532,16 @@ fi
|
||||
# deadline fires at 5 min and sets status=failed prematurely; heartbeat
|
||||
# then transitions failed → online after install.sh finishes. So:
|
||||
#
|
||||
# - 20 min deadline (hermes worst-case + slack)
|
||||
# - ${WORKSPACE_ONLINE_TIMEOUT_SECS}s (~$((WORKSPACE_ONLINE_TIMEOUT_SECS/60)) min)
|
||||
# deadline (hermes worst-case + slack). Configurable via
|
||||
# E2E_WORKSPACE_ONLINE_TIMEOUT_SECS (#1646).
|
||||
# - 'failed' is a TRANSIENT state we must tolerate — log and keep
|
||||
# polling, only hard-fail at the deadline. Pre-bootstrap-watcher-fix
|
||||
# (controlplane#245) this was a flake generator: workspace went
|
||||
# failed→online inside our window but we bailed at the failed read.
|
||||
log "7/11 Waiting for workspace(s) to reach status=online (up to 30 min — hermes cold boot)..."
|
||||
WS_DEADLINE=$(( $(date +%s) + 1800 ))
|
||||
WS_TO_CHECK="$PARENT_ID"
|
||||
[ -n "$CHILD_ID" ] && WS_TO_CHECK="$WS_TO_CHECK $CHILD_ID"
|
||||
for wid in $WS_TO_CHECK; do
|
||||
WS_LAST_STATUS=""
|
||||
WS_FAILED_LOGGED=0
|
||||
while true; do
|
||||
if [ "$(date +%s)" -gt "$WS_DEADLINE" ]; then
|
||||
WS_LAST_ERR=$(tenant_call GET "/workspaces/$wid" 2>/dev/null | \
|
||||
python3 -c "import json,sys; print(json.load(sys.stdin).get('last_sample_error',''))" 2>/dev/null || echo "")
|
||||
fail "Workspace $wid never reached online within 20 min (last status=$WS_LAST_STATUS, err=$WS_LAST_ERR)"
|
||||
fi
|
||||
WS_JSON=$(tenant_call GET "/workspaces/$wid" 2>/dev/null || echo '{}')
|
||||
WS_STATUS=$(echo "$WS_JSON" | python3 -c "import json,sys; print(json.load(sys.stdin).get('status',''))" 2>/dev/null)
|
||||
if [ "$WS_STATUS" != "$WS_LAST_STATUS" ]; then
|
||||
log " $wid → $WS_STATUS"
|
||||
WS_LAST_STATUS="$WS_STATUS"
|
||||
fi
|
||||
case "$WS_STATUS" in
|
||||
online) break ;;
|
||||
failed)
|
||||
# Not a hard fail — bootstrap-watcher frequently marks failed at
|
||||
# 5 min on hermes, then heartbeat recovers to online around 10-13
|
||||
# min when install.sh finishes. Log once per workspace so the CI
|
||||
# output isn't spammy.
|
||||
if [ "$WS_FAILED_LOGGED" = "0" ]; then
|
||||
log " $wid transiently failed — waiting for heartbeat recovery (bootstrap-watcher deadline, see cp#245)"
|
||||
WS_FAILED_LOGGED=1
|
||||
fi
|
||||
sleep 10
|
||||
;;
|
||||
*) sleep 10 ;;
|
||||
esac
|
||||
done
|
||||
ok " $wid online"
|
||||
done
|
||||
WS_TO_CHECK=("$PARENT_ID")
|
||||
[ -n "$CHILD_ID" ] && WS_TO_CHECK+=("$CHILD_ID")
|
||||
wait_workspaces_online_routable "7/11 Waiting for workspace(s) to reach status=online (up to $((WORKSPACE_ONLINE_TIMEOUT_SECS/60)) min — hermes cold boot)..." "${WS_TO_CHECK[@]}"
|
||||
|
||||
# ─── 7b. Canvas-terminal diagnose (EIC chain probe) ────────────────────
|
||||
# This step exists because the canvas-terminal failure of 2026-05-03
|
||||
@@ -490,7 +553,7 @@ done
|
||||
# - tenantIngressRules / workspaceIngressRules (CP)
|
||||
# - eicSSHIngressRule helper (CP)
|
||||
# - AuthorizeIngress source-group support (CP awsapi)
|
||||
# - EIC_ENDPOINT_SG_ID Railway env
|
||||
# - MOLECULE_EIC_ENDPOINT_SG_ID Railway env
|
||||
# - handleRemoteConnect's send-ssh-public-key/open-tunnel/ssh chain
|
||||
# surfaces within ~20 min of merge instead of waiting for a user report.
|
||||
#
|
||||
@@ -504,7 +567,7 @@ done
|
||||
# probes docker.Ping + container exec; we still expect ok=true there
|
||||
# since local-docker is the alternative production path.
|
||||
log "7b/11 Canvas-terminal EIC diagnose probe..."
|
||||
for wid in $WS_TO_CHECK; do
|
||||
for wid in "${WS_TO_CHECK[@]}"; do
|
||||
DIAG_JSON=$(tenant_call GET "/workspaces/$wid/terminal/diagnose" 2>/dev/null || echo '{}')
|
||||
DIAG_OK=$(echo "$DIAG_JSON" | python3 -c "import json,sys; d=json.load(sys.stdin); print('true' if d.get('ok') else 'false')" 2>/dev/null || echo "false")
|
||||
if [ "$DIAG_OK" = "true" ]; then
|
||||
@@ -512,7 +575,7 @@ for wid in $WS_TO_CHECK; do
|
||||
else
|
||||
DIAG_FAIL=$(echo "$DIAG_JSON" | python3 -c "import json,sys; d=json.load(sys.stdin); print(d.get('first_failure','unknown'))" 2>/dev/null || echo "unknown")
|
||||
DIAG_DETAIL=$(echo "$DIAG_JSON" | python3 -c "import json,sys; d=json.load(sys.stdin); s=[x for x in d.get('steps',[]) if not x.get('ok')]; step=s[0] if s else {}; print(' — '.join(x for x in [step.get('error',''), step.get('detail','')] if x))" 2>/dev/null || echo "")
|
||||
fail "Workspace $wid terminal diagnose failed at step '$DIAG_FAIL': $DIAG_DETAIL — check tenant SG has tcp/22 from EIC endpoint SG (sg-0785d5c6138220523), EIC_ENDPOINT_SG_ID set in Railway, and EIC endpoint health"
|
||||
fail "Workspace $wid terminal diagnose failed at step '$DIAG_FAIL': $DIAG_DETAIL — check tenant SG has tcp/22 from the configured EIC endpoint SG, MOLECULE_EIC_ENDPOINT_SG_ID is set in Railway, and EIC endpoint health"
|
||||
fi
|
||||
done
|
||||
|
||||
@@ -540,7 +603,7 @@ CONFIG_PAYLOAD="${CONFIG_MARKER}
|
||||
name: synth-canary
|
||||
runtime: ${RUNTIME}
|
||||
"
|
||||
for wid in $WS_TO_CHECK; do
|
||||
for wid in "${WS_TO_CHECK[@]}"; do
|
||||
PUT_BODY=$(python3 -c "import json,sys; print(json.dumps({'content': sys.stdin.read()}))" <<< "$CONFIG_PAYLOAD")
|
||||
# Capture body to a tempfile so curl's -w '%{http_code}' is the only
|
||||
# thing on stdout. The first version used `-w '\n%{http_code}\n'` and
|
||||
@@ -573,6 +636,12 @@ for wid in $WS_TO_CHECK; do
|
||||
ok " $wid config.yaml PUT OK (HTTP $PUT_CODE)"
|
||||
done
|
||||
|
||||
# Saving config.yaml follows the same path as Canvas Config Save & Restart.
|
||||
# The controlplane can briefly put the workspace back into provisioning and
|
||||
# clear its route while the runtime restarts, so A2A must wait on the same
|
||||
# externally routable readiness boundary again.
|
||||
wait_workspaces_online_routable "7d/11 Waiting for workspace(s) to recover routing after config.yaml PUT..." "${WS_TO_CHECK[@]}"
|
||||
|
||||
# ─── 8. A2A round-trip on parent ───────────────────────────────────────
|
||||
log "8/11 Sending A2A message to parent — expecting agent response..."
|
||||
# Smoke prompt phrasing — DO NOT trim back to the bare "Reply with exactly: PONG"
|
||||
@@ -612,10 +681,44 @@ print(json.dumps({
|
||||
# 90s gives ~3x headroom over observed cold-call P95 (~25-30s).
|
||||
# Subsequent A2A turns hit the same workspace and are sub-second, so
|
||||
# this only widens the window for step 8/11 of the canary's first turn.
|
||||
A2A_RESP=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \
|
||||
--max-time 90 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$A2A_PAYLOAD")
|
||||
A2A_TMP=$(mktemp -t synth_a2a.XXXXXX)
|
||||
for A2A_ATTEMPT in $(seq 1 12); do
|
||||
: >"$A2A_TMP"
|
||||
set +e
|
||||
A2A_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \
|
||||
--max-time 90 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$A2A_PAYLOAD" \
|
||||
-o "$A2A_TMP" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
A2A_RC=$?
|
||||
set -e
|
||||
A2A_CODE=${A2A_CODE:-000}
|
||||
A2A_RESP=$(cat "$A2A_TMP" 2>/dev/null || echo "")
|
||||
if [ "$A2A_RC" = "0" ] && [ "$A2A_CODE" -ge 200 ] && [ "$A2A_CODE" -lt 300 ]; then
|
||||
break
|
||||
fi
|
||||
|
||||
A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body)
|
||||
if echo "$A2A_CODE" | grep -Eq '^(502|503|504)$' && echo "$A2A_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then
|
||||
log " A2A cold-start probe attempt $A2A_ATTEMPT/12 returned $A2A_CODE: $A2A_SAFE_BODY"
|
||||
if [ "$A2A_ATTEMPT" -lt 12 ]; then
|
||||
A2A_SLEEP=10
|
||||
if echo "$A2A_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session'; then
|
||||
A2A_SLEEP=30
|
||||
fi
|
||||
sleep "$A2A_SLEEP"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
break
|
||||
done
|
||||
rm -f "$A2A_TMP"
|
||||
if [ "$A2A_RC" != "0" ] || [ "$A2A_CODE" -lt 200 ] || [ "$A2A_CODE" -ge 300 ]; then
|
||||
A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body)
|
||||
fail "A2A POST /workspaces/$PARENT_ID/a2a failed after $A2A_ATTEMPT attempt(s) (curl_rc=$A2A_RC, http=$A2A_CODE): $A2A_SAFE_BODY"
|
||||
fi
|
||||
AGENT_TEXT=$(echo "$A2A_RESP" | python3 -c "
|
||||
import json, sys
|
||||
d = json.load(sys.stdin)
|
||||
@@ -812,20 +915,50 @@ print(json.dumps({
|
||||
}
|
||||
}))
|
||||
")
|
||||
set +e
|
||||
# Raw curl (not tenant_call) because this call carries an extra
|
||||
# X-Source-Workspace-Id header. Must still send X-Molecule-Org-Id
|
||||
# or TenantGuard 404s — previously missing, caused section 10 to
|
||||
# fail rc=22 despite everything upstream being correct (2026-04-21).
|
||||
DELEG_RESP=$(curl "${CURL_COMMON[@]}" -X POST "$TENANT_URL/workspaces/$CHILD_ID/a2a" \
|
||||
-H "Authorization: Bearer $EFFECTIVE_TENANT_TOKEN" \
|
||||
-H "X-Molecule-Org-Id: $ORG_ID" \
|
||||
-H "X-Source-Workspace-Id: $PARENT_ID" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$DELEG_PAYLOAD")
|
||||
DELEG_RC=$?
|
||||
set -e
|
||||
[ $DELEG_RC -ne 0 ] && fail "Delegation A2A POST failed (rc=$DELEG_RC)"
|
||||
DELEG_TMP=$(mktemp -t deleg_a2a.XXXXXX)
|
||||
for DELEG_ATTEMPT in $(seq 1 12); do
|
||||
: >"$DELEG_TMP"
|
||||
set +e
|
||||
# Raw curl (not tenant_call) because this call carries an extra
|
||||
# X-Source-Workspace-Id header. Must still send X-Molecule-Org-Id
|
||||
# or TenantGuard 404s — previously missing, caused section 10 to
|
||||
# fail rc=22 despite everything upstream being correct (2026-04-21).
|
||||
DELEG_CODE=$(curl "${CURL_COMMON[@]}" -X POST "$TENANT_URL/workspaces/$CHILD_ID/a2a" \
|
||||
-H "Authorization: Bearer $EFFECTIVE_TENANT_TOKEN" \
|
||||
-H "X-Molecule-Org-Id: $ORG_ID" \
|
||||
-H "X-Source-Workspace-Id: $PARENT_ID" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$DELEG_PAYLOAD" \
|
||||
-o "$DELEG_TMP" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
DELEG_RC=$?
|
||||
set -e
|
||||
DELEG_CODE=${DELEG_CODE:-000}
|
||||
DELEG_RESP=$(cat "$DELEG_TMP" 2>/dev/null || echo "")
|
||||
if [ "$DELEG_RC" = "0" ] && [ "$DELEG_CODE" -ge 200 ] && [ "$DELEG_CODE" -lt 300 ]; then
|
||||
break
|
||||
fi
|
||||
|
||||
DELEG_SAFE_BODY=$(printf '%s' "$DELEG_RESP" | sanitize_http_body)
|
||||
if echo "$DELEG_CODE" | grep -Eq '^(502|503|504)$' && echo "$DELEG_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then
|
||||
log " Delegation A2A cold-start attempt $DELEG_ATTEMPT/12 returned $DELEG_CODE: $DELEG_SAFE_BODY"
|
||||
if [ "$DELEG_ATTEMPT" -lt 12 ]; then
|
||||
DELEG_SLEEP=10
|
||||
if echo "$DELEG_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session'; then
|
||||
DELEG_SLEEP=30
|
||||
fi
|
||||
sleep "$DELEG_SLEEP"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
break
|
||||
done
|
||||
rm -f "$DELEG_TMP"
|
||||
if [ "$DELEG_RC" != "0" ] || [ "$DELEG_CODE" -lt 200 ] || [ "$DELEG_CODE" -ge 300 ]; then
|
||||
DELEG_SAFE_BODY=$(printf '%s' "$DELEG_RESP" | sanitize_http_body)
|
||||
fail "Delegation A2A POST failed after $DELEG_ATTEMPT attempt(s) (curl_rc=$DELEG_RC, http=$DELEG_CODE): $DELEG_SAFE_BODY"
|
||||
fi
|
||||
DELEG_TEXT=$(echo "$DELEG_RESP" | python3 -c "
|
||||
import json, sys
|
||||
try:
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
|
||||
|
||||
def test_staging_e2e_workflows_use_stable_minimax_default() -> None:
|
||||
"""Keep cron/push E2E on the same MiniMax model as the smoke-tested script."""
|
||||
workflow_paths = [
|
||||
".gitea/workflows/e2e-staging-saas.yml",
|
||||
".gitea/workflows/staging-smoke.yml",
|
||||
".gitea/workflows/continuous-synth-e2e.yml",
|
||||
]
|
||||
|
||||
for rel in workflow_paths:
|
||||
text = (ROOT / rel).read_text()
|
||||
assert "MiniMax-M2.7-highspeed" not in text
|
||||
assert "MiniMax-M2" in text
|
||||
@@ -705,7 +705,7 @@ def test_ci_change_detector_docs_and_meta_scripts_do_not_trigger_surfaces():
|
||||
}
|
||||
|
||||
|
||||
def test_ci_platform_go_pr_steps_are_path_scoped():
|
||||
def test_ci_platform_go_steps_are_path_scoped_on_all_events():
|
||||
doc = yaml.safe_load(CI_WORKFLOW.read_text(encoding="utf-8"))
|
||||
platform = doc["jobs"]["platform-build"]
|
||||
assert platform.get("needs") == "changes"
|
||||
@@ -720,11 +720,11 @@ def test_ci_platform_go_pr_steps_are_path_scoped():
|
||||
assert expensive_steps
|
||||
for step in expensive_steps:
|
||||
expr = step.get("if", "")
|
||||
assert "github.event_name != 'pull_request'" in expr
|
||||
assert "needs.changes.outputs.platform == 'true'" in expr
|
||||
assert "github.event_name != 'pull_request'" not in expr
|
||||
|
||||
|
||||
def test_ci_canvas_nextjs_pr_steps_are_path_scoped():
|
||||
def test_ci_canvas_nextjs_steps_are_path_scoped_on_all_events():
|
||||
doc = yaml.safe_load(CI_WORKFLOW.read_text(encoding="utf-8"))
|
||||
canvas = doc["jobs"]["canvas-build"]
|
||||
assert canvas.get("needs") == "changes"
|
||||
@@ -739,11 +739,11 @@ def test_ci_canvas_nextjs_pr_steps_are_path_scoped():
|
||||
assert expensive_steps
|
||||
for step in expensive_steps:
|
||||
expr = step.get("if", "")
|
||||
assert "github.event_name != 'pull_request'" in expr
|
||||
assert "needs.changes.outputs.canvas == 'true'" in expr
|
||||
assert "github.event_name != 'pull_request'" not in expr
|
||||
|
||||
|
||||
def test_ci_shellcheck_pr_steps_are_path_scoped():
|
||||
def test_ci_shellcheck_steps_are_path_scoped_on_all_events():
|
||||
doc = yaml.safe_load(CI_WORKFLOW.read_text(encoding="utf-8"))
|
||||
shellcheck = doc["jobs"]["shellcheck"]
|
||||
assert shellcheck.get("needs") == "changes"
|
||||
@@ -756,5 +756,5 @@ def test_ci_shellcheck_pr_steps_are_path_scoped():
|
||||
assert expensive_steps
|
||||
for step in expensive_steps:
|
||||
expr = step.get("if", "")
|
||||
assert "github.event_name != 'pull_request'" in expr
|
||||
assert "needs.changes.outputs.scripts == 'true'" in expr
|
||||
assert "github.event_name != 'pull_request'" not in expr
|
||||
|
||||
@@ -56,6 +56,21 @@ SCRIPT_PATH = (
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _stub_time_sleep(monkeypatch):
|
||||
"""Autouse: stub time.sleep across every test.
|
||||
|
||||
The watchdog's RECHECK_DELAY_SECS (default 90s) is wired into
|
||||
run_once() via time.sleep(). Without this stub, integration-style
|
||||
tests that exercise run_once() would each block for 90s — a
|
||||
pre-fix `pytest -q` ran in ~0.1s; the unstubbed equivalent took
|
||||
>4 minutes (task #394 review evidence). Stubbing here keeps the
|
||||
suite fast and deterministic without requiring every red-path test
|
||||
to remember the patch.
|
||||
"""
|
||||
monkeypatch.setattr("time.sleep", lambda s: None)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def wd_module():
|
||||
"""Import the script as a module under a known env."""
|
||||
@@ -809,3 +824,214 @@ def test_require_runtime_env_exits_when_missing(wd_module, monkeypatch):
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
wd_module._require_runtime_env()
|
||||
assert excinfo.value.code == 2
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Action-run status filter + HEAD-recheck (task #394, mc#1597..1630)
|
||||
#
|
||||
# The existing cancel-cascade filter matched description=='Has been
|
||||
# cancelled' EXACTLY, but a 7-day DB sweep on 2026-05-20 showed that
|
||||
# only 76/702 (~11%) of action_run.status=3 (Cancelled) entries carry
|
||||
# that string — 89% are written as 'Failing after Ns', indistinguishable
|
||||
# from real action_run.status=2 (Failure) at the commit_status layer.
|
||||
#
|
||||
# Gitea 1.22.6 has NO REST endpoint exposing action_run.status, so the
|
||||
# canonical filter (status=2 only) cannot run from a Gitea Actions
|
||||
# runner. The next-best signal is the HEAD-recheck: re-fetch HEAD SHA
|
||||
# (or its combined status) right before filing. If HEAD moved on or
|
||||
# combined state recovered, the prior "red" was a transient
|
||||
# cancel-cascade and we skip-file.
|
||||
#
|
||||
# References:
|
||||
# - reference_chronic_red_sweep_cancelled_vs_failed_filter
|
||||
# - feedback_gitea_status_enum_use_helper_not_raw_int
|
||||
# - reference_gitea_action_status_enum_corrected_2026_05_19
|
||||
# - triage evidence 2026-05-21 04:55 (6 cancellation + 1 emission
|
||||
# artifact across mc#1597,1605,1609,1613,1626,1627,1630)
|
||||
# --------------------------------------------------------------------------
|
||||
def test_head_recheck_skips_file_when_head_moved(wd_module, monkeypatch, capsys):
|
||||
"""When initial tick sees red at SHA_A but HEAD has since moved to
|
||||
SHA_B (next commit landed mid-tick), the watchdog must NOT file.
|
||||
Re-evaluation happens on the next cron tick against the new SHA.
|
||||
|
||||
REGRESSION CLASS: this guards mc#1597..#1630 — 7 false-positives
|
||||
filed in 24h because cancel-cascade fired commit_status=failure
|
||||
rows on SHAs that were already superseded by new merges."""
|
||||
SHA_A = SHA_RED
|
||||
SHA_B = SHA_GREEN
|
||||
failed_ctx = [
|
||||
{"context": "ci/test", "status": "failure",
|
||||
"target_url": "/r/runs/100/jobs/0",
|
||||
"description": "Failing after 12s"},
|
||||
]
|
||||
# First branches read returns SHA_A; the second (recheck) returns SHA_B
|
||||
# → watchdog detects HEAD drift and skip-files.
|
||||
branches_responses = iter([
|
||||
(200, _branches_response(SHA_A)),
|
||||
(200, _branches_response(SHA_B)),
|
||||
])
|
||||
|
||||
def fake_api(method, path, *, body=None, query=None, expect_json=True):
|
||||
if method == "GET" and path == "/repos/owner/repo/branches/main":
|
||||
return next(branches_responses)
|
||||
if method == "GET" and path == f"/repos/owner/repo/commits/{SHA_A}/status":
|
||||
return (200, _combined_status("failure", failed_ctx))
|
||||
if method == "POST" and path == "/repos/owner/repo/issues":
|
||||
raise AssertionError(
|
||||
"watchdog filed a phantom issue despite HEAD moving away "
|
||||
"from the red SHA (regression: mc#1597..1630)"
|
||||
)
|
||||
if method == "GET" and path == "/repos/owner/repo/issues":
|
||||
return (200, [])
|
||||
raise AssertionError(f"unexpected api call: {method} {path}")
|
||||
|
||||
# Settling delay is no-op'd by the _stub_time_sleep autouse fixture.
|
||||
monkeypatch.setattr(wd_module, "api", fake_api)
|
||||
wd_module.run_once(dry_run=False)
|
||||
captured = capsys.readouterr()
|
||||
assert "head drift" in captured.out.lower() or "head moved" in captured.out.lower(), (
|
||||
f"expected a notice about HEAD drift, got: {captured.out!r}"
|
||||
)
|
||||
|
||||
|
||||
def test_head_recheck_skips_file_when_recheck_status_recovered(
|
||||
wd_module, monkeypatch, capsys,
|
||||
):
|
||||
"""When initial tick sees red at SHA, but the post-settling recheck
|
||||
on the SAME SHA shows combined status recovered (e.g. transient
|
||||
cancel-cascade rolled forward to success on retry), skip-file.
|
||||
|
||||
This catches the mid-flight cancel-cascade window — the second
|
||||
largest false-positive cluster in mc#1597..1630."""
|
||||
failed_ctx_initial = [
|
||||
{"context": "ci/test", "status": "failure",
|
||||
"target_url": "/r/runs/100/jobs/0",
|
||||
"description": "Failing after 12s"},
|
||||
]
|
||||
recovered_ctx = [
|
||||
{"context": "ci/test", "status": "success",
|
||||
"target_url": "/r/runs/100/jobs/0",
|
||||
"description": "Successful in 30s"},
|
||||
]
|
||||
# Same SHA across both branch reads; status flips from failure→success
|
||||
# between the two combined-status reads.
|
||||
status_responses = iter([
|
||||
(200, _combined_status("failure", failed_ctx_initial)),
|
||||
(200, _combined_status("success", recovered_ctx)),
|
||||
])
|
||||
|
||||
def fake_api(method, path, *, body=None, query=None, expect_json=True):
|
||||
if method == "GET" and path == "/repos/owner/repo/branches/main":
|
||||
return (200, _branches_response(SHA_RED))
|
||||
if method == "GET" and path == f"/repos/owner/repo/commits/{SHA_RED}/status":
|
||||
return next(status_responses)
|
||||
if method == "POST" and path == "/repos/owner/repo/issues":
|
||||
raise AssertionError(
|
||||
"watchdog filed a phantom issue despite combined status "
|
||||
"recovering on recheck (mid-flight cancel-cascade window)"
|
||||
)
|
||||
if method == "GET" and path == "/repos/owner/repo/issues":
|
||||
return (200, [])
|
||||
raise AssertionError(f"unexpected api call: {method} {path}")
|
||||
|
||||
monkeypatch.setattr(wd_module, "api", fake_api)
|
||||
wd_module.run_once(dry_run=False)
|
||||
captured = capsys.readouterr()
|
||||
assert "recovered" in captured.out.lower() or "settled" in captured.out.lower(), (
|
||||
f"expected a notice about post-settling recovery, got: {captured.out!r}"
|
||||
)
|
||||
|
||||
|
||||
def test_head_recheck_files_when_still_red_after_settling(
|
||||
wd_module, monkeypatch,
|
||||
):
|
||||
"""When BOTH the initial detection AND the post-settling recheck
|
||||
show the same SHA still red, file the issue. This is the genuine-
|
||||
failure path the watchdog is designed to surface.
|
||||
|
||||
Locks the over-filter: a future change that always-skips after
|
||||
recheck would dismiss real failures."""
|
||||
failed_ctx = [
|
||||
{"context": "ci/test", "status": "failure",
|
||||
"target_url": "/r/runs/100/jobs/0",
|
||||
"description": "Failing after 12s"},
|
||||
]
|
||||
post_filed = {"value": False}
|
||||
|
||||
def fake_api(method, path, *, body=None, query=None, expect_json=True):
|
||||
if method == "GET" and path == "/repos/owner/repo/branches/main":
|
||||
return (200, _branches_response(SHA_RED))
|
||||
if method == "GET" and path == f"/repos/owner/repo/commits/{SHA_RED}/status":
|
||||
return (200, _combined_status("failure", failed_ctx))
|
||||
if method == "GET" and path == "/repos/owner/repo/issues":
|
||||
return (200, [])
|
||||
if method == "GET" and path == "/repos/owner/repo/labels":
|
||||
return (200, [{"id": 9, "name": "tier:high"}])
|
||||
if method == "POST" and path == "/repos/owner/repo/issues":
|
||||
post_filed["value"] = True
|
||||
return (201, {"number": 999})
|
||||
if method == "POST" and path == "/repos/owner/repo/issues/999/labels":
|
||||
return (200, [])
|
||||
raise AssertionError(f"unexpected api call: {method} {path}")
|
||||
|
||||
monkeypatch.setattr(wd_module, "api", fake_api)
|
||||
wd_module.run_once(dry_run=False)
|
||||
assert post_filed["value"], (
|
||||
"genuine-failure path was skip-filed — head-recheck over-filter "
|
||||
"regression (would suppress all real main-red alarms)"
|
||||
)
|
||||
|
||||
|
||||
def test_head_recheck_skips_when_initial_was_only_cancel_cascade(
|
||||
wd_module, monkeypatch,
|
||||
):
|
||||
"""Belt-and-braces: combined-status failure caused exclusively by
|
||||
description='Has been cancelled' entries should still be filtered
|
||||
by the EXISTING cancel-cascade filter — head-recheck must not
|
||||
accidentally bypass it. Regression guard for the existing mc#1564
|
||||
fix."""
|
||||
failed_ctx = [
|
||||
{"context": "ci/test", "status": "failure",
|
||||
"description": "Has been cancelled"},
|
||||
]
|
||||
|
||||
def fake_api(method, path, *, body=None, query=None, expect_json=True):
|
||||
if method == "GET" and path == "/repos/owner/repo/branches/main":
|
||||
return (200, _branches_response(SHA_RED))
|
||||
if method == "GET" and path == f"/repos/owner/repo/commits/{SHA_RED}/status":
|
||||
return (200, _combined_status("failure", failed_ctx))
|
||||
if method == "POST" and path == "/repos/owner/repo/issues":
|
||||
raise AssertionError(
|
||||
"cancel-cascade-only entry must be filtered before any "
|
||||
"head-recheck logic runs"
|
||||
)
|
||||
if method == "GET" and path == "/repos/owner/repo/issues":
|
||||
return (200, [])
|
||||
# No commit-status recheck should happen because is_red() returned False
|
||||
raise AssertionError(f"unexpected api call: {method} {path}")
|
||||
|
||||
monkeypatch.setattr(wd_module, "api", fake_api)
|
||||
wd_module.run_once(dry_run=False)
|
||||
# success: no AssertionError raised, no POST
|
||||
|
||||
|
||||
def test_resolve_action_run_status_returns_none_on_no_endpoint(wd_module):
|
||||
"""The action_run.status REST endpoint does NOT exist in Gitea
|
||||
1.22.6 (verified empirically 2026-05-20 — /api/v1/.../actions/runs/N
|
||||
returns HTTP 404 across all probe variants). The resolver must
|
||||
return None gracefully so callers fall back to the description-
|
||||
string + head-recheck heuristics.
|
||||
|
||||
This pins the extensibility hook: when a future Gitea release (or
|
||||
an op-host proxy) exposes the endpoint, the resolver implementation
|
||||
can be swapped in without touching the caller contract."""
|
||||
# The function exists and is callable
|
||||
assert hasattr(wd_module, "_resolve_action_run_status")
|
||||
# A typical target_url shape from real Gitea commit_status rows:
|
||||
target_url = "/molecule-ai/molecule-core/actions/runs/75020/jobs/0"
|
||||
# Return None when no endpoint available
|
||||
out = wd_module._resolve_action_run_status(target_url)
|
||||
assert out is None, (
|
||||
"resolver must return None when the action_run.status endpoint "
|
||||
"isn't reachable — callers depend on the None-fallback path"
|
||||
)
|
||||
|
||||
@@ -442,6 +442,46 @@ def test_reap_preserves_real_push(sr_module, monkeypatch):
|
||||
assert calls == [] # NO POST
|
||||
|
||||
|
||||
def test_reap_compensates_cancelled_real_push_status(sr_module, monkeypatch):
|
||||
"""Gitea 1.22.6 maps cancelled push runs to failure statuses.
|
||||
|
||||
A real push workflow with description exactly "Has been cancelled"
|
||||
is cancel-cascade noise, not a defect signal. Status-reaper should
|
||||
compensate it even though the workflow has a push trigger.
|
||||
"""
|
||||
calls = []
|
||||
|
||||
def fake_api(method, path, *, body=None, query=None, expect_json=True):
|
||||
calls.append((method, path, body))
|
||||
return (201, {})
|
||||
|
||||
monkeypatch.setattr(sr_module, "api", fake_api)
|
||||
|
||||
workflow_map = {"ci": True}
|
||||
combined = {
|
||||
"state": "failure",
|
||||
"statuses": [
|
||||
{
|
||||
"context": "ci / test (push)",
|
||||
"status": "failure",
|
||||
"description": "Has been cancelled",
|
||||
"target_url": "https://example.test/actions/runs/1",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
counters = sr_module.reap(workflow_map, combined, SHA, dry_run=False)
|
||||
|
||||
assert counters["compensated"] == 1
|
||||
assert counters["compensated_cancelled_push"] == 1
|
||||
assert counters["preserved_real_push"] == 0
|
||||
assert len(calls) == 1
|
||||
assert calls[0][0] == "POST"
|
||||
assert calls[0][1] == f"/repos/owner/repo/statuses/{SHA}"
|
||||
assert calls[0][2]["context"] == "ci / test (push)"
|
||||
assert calls[0][2]["state"] == "success"
|
||||
|
||||
|
||||
def test_reap_preserves_unknown_workflow(sr_module, monkeypatch, capsys):
|
||||
"""Workflow not in map → ::notice:: + skip (conservative)."""
|
||||
monkeypatch.setattr(
|
||||
|
||||
@@ -686,11 +686,22 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri
|
||||
_ = db.CacheURL(ctx, workspaceID, agentURL)
|
||||
}
|
||||
|
||||
// When the platform runs inside Docker, 127.0.0.1:{host_port} is
|
||||
// unreachable (it's the platform container's own localhost, not the
|
||||
// Docker host). Rewrite to the container's Docker-bridge hostname.
|
||||
// When the platform runs inside Docker, a managed workspace's
|
||||
// 127.0.0.1:{host_port} URL points at the Docker host and must be
|
||||
// rewritten to the workspace container's Docker-bridge hostname.
|
||||
// External runtimes are not managed containers; their local test/runtime
|
||||
// URL is the target and must not be synthesized into ws-<id>:8000.
|
||||
if strings.HasPrefix(agentURL, "http://127.0.0.1:") && h.provisioner != nil && platformInDocker {
|
||||
agentURL = provisioner.InternalURL(workspaceID)
|
||||
var wsRuntime string
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&wsRuntime); err != nil {
|
||||
log.Printf("ProxyA2A: runtime lookup before Docker URL rewrite failed for %s: %v", workspaceID, err)
|
||||
}
|
||||
if !isExternalLikeRuntime(wsRuntime) {
|
||||
agentURL = provisioner.InternalURL(workspaceID)
|
||||
}
|
||||
}
|
||||
// SSRF defence: reject private/metadata URLs before making outbound call.
|
||||
if err := isSafeURL(agentURL); err != nil {
|
||||
|
||||
@@ -1511,6 +1511,35 @@ func TestResolveAgentURL_DockerRewrite(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveAgentURL_ExternalRuntimeLoopbackNotRewrittenInDocker(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
handler.provisioner = &stubLocalProv{}
|
||||
|
||||
restore := setPlatformInDockerForTest(true)
|
||||
defer restore()
|
||||
|
||||
agentURL := "http://127.0.0.1:55555"
|
||||
mr.Set("ws:ws-external:url", agentURL)
|
||||
mock.ExpectQuery("SELECT COALESCE\\(runtime").
|
||||
WithArgs("ws-external").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("external"))
|
||||
|
||||
url, perr := handler.resolveAgentURL(context.Background(), "ws-external")
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
if url != agentURL {
|
||||
t.Errorf("external runtime loopback URL must not be rewritten; got %q want %q", url, agentURL)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- dispatchA2A direct unit tests ---
|
||||
|
||||
func TestDispatchA2A_BuildRequestError(t *testing.T) {
|
||||
|
||||
@@ -67,7 +67,213 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
|
||||
return &ActivityHandler{broadcaster: b}
|
||||
}
|
||||
|
||||
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
|
||||
// extractAttachmentsFromRequestBody walks a JSON-RPC a2a inbound body to
|
||||
// surface attachments (file/image/audio/video) as a flat `attachments[]`
|
||||
// projection so callers don't have to drill into the request_body shape
|
||||
// themselves.
|
||||
//
|
||||
// Two body shapes are walked in order:
|
||||
//
|
||||
// 1. a2a-sdk v1 message-part envelope (peer_agent inbound):
|
||||
//
|
||||
// {"jsonrpc":"2.0","method":"message/send","params":{
|
||||
// "message":{"parts":[
|
||||
// {"kind":"text", "text":"hi"},
|
||||
// {"kind":"file", "file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}},
|
||||
// {"kind":"image","file":{"uri":"workspace:bar.png","mime_type":"image/png","name":"bar.png"}},
|
||||
// ]}}}
|
||||
//
|
||||
// 2. canvas chat_upload_receive flat manifest (canvas_user upload):
|
||||
//
|
||||
// {"uri":"platform-pending:<ws>/<file>",
|
||||
// "name":"pasted.png",
|
||||
// "size":12345,
|
||||
// "file_id":"<uuid>",
|
||||
// "mimeType":"image/png"}
|
||||
//
|
||||
// The canvas upload pipe writes a single manifest directly at the
|
||||
// root of request_body (no JSON-RPC envelope) with camelCase
|
||||
// `mimeType`. We normalize to snake_case `mime_type` on the way out
|
||||
// so every downstream adaptor (channel / telegram / codex / hermes)
|
||||
// sees one wire shape regardless of which inbound shape produced it.
|
||||
//
|
||||
// Returns nil (omit-from-JSON) when the body has no attachments — the
|
||||
// `?include=peer_info` envelope projects this as an array iff non-empty.
|
||||
//
|
||||
// Defensive on every step: any missing key / wrong-shape value falls
|
||||
// through to the next arm or returns nil instead of panicking. The
|
||||
// activity_logs row could carry literally any JSON in request_body
|
||||
// (legacy formats, future formats); we only commit to the documented
|
||||
// shapes and silently skip anything else.
|
||||
func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} {
|
||||
if len(raw) == 0 {
|
||||
return nil
|
||||
}
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &body); err != nil {
|
||||
return nil
|
||||
}
|
||||
if atts := extractAttachmentsFromMessageParts(body); len(atts) > 0 {
|
||||
return atts
|
||||
}
|
||||
if att := extractAttachmentFromFlatUploadManifest(body); att != nil {
|
||||
return []map[string]interface{}{att}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractAttachmentsFromMessageParts handles the a2a-sdk v1 shape:
|
||||
// body.params.message.parts[]. Walks file/image/audio parts; honors v1
|
||||
// `kind` and v0 `type` discriminators; accepts nested `.file` sub-object
|
||||
// or inlined uri/mime_type/name on the part itself.
|
||||
func extractAttachmentsFromMessageParts(body map[string]interface{}) []map[string]interface{} {
|
||||
params, ok := body["params"].(map[string]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
message, ok := params["message"].(map[string]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
parts, ok := message["parts"].([]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
out := make([]map[string]interface{}, 0)
|
||||
for _, p := range parts {
|
||||
part, ok := p.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// a2a-sdk v1 uses "kind"; older v0 callers sent "type". Accept
|
||||
// both for the discriminator — same defensive read pattern as
|
||||
// the runtime-side extract_text helper.
|
||||
kind, _ := part["kind"].(string)
|
||||
if kind == "" {
|
||||
kind, _ = part["type"].(string)
|
||||
}
|
||||
if kind != "file" && kind != "image" && kind != "audio" {
|
||||
continue
|
||||
}
|
||||
// The file sub-object holds uri/mime_type/name. The a2a-sdk v1
|
||||
// shape nests under "file"; some legacy payloads inlined the
|
||||
// fields onto the part itself. Support both.
|
||||
var fileObj map[string]interface{}
|
||||
if f, ok := part["file"].(map[string]interface{}); ok {
|
||||
fileObj = f
|
||||
} else {
|
||||
fileObj = part
|
||||
}
|
||||
uri, _ := fileObj["uri"].(string)
|
||||
mimeType, _ := fileObj["mime_type"].(string)
|
||||
name, _ := fileObj["name"].(string)
|
||||
// At minimum we need either a uri or a name to be useful.
|
||||
// Empty-part entries are skipped (they're a malformed inbound
|
||||
// — surface nothing rather than emit a no-info placeholder).
|
||||
if uri == "" && name == "" {
|
||||
continue
|
||||
}
|
||||
att := map[string]interface{}{"kind": kind}
|
||||
if uri != "" {
|
||||
att["uri"] = uri
|
||||
}
|
||||
if mimeType != "" {
|
||||
att["mime_type"] = mimeType
|
||||
}
|
||||
if name != "" {
|
||||
att["name"] = name
|
||||
}
|
||||
out = append(out, att)
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// extractAttachmentFromFlatUploadManifest handles the canvas
|
||||
// chat_upload_receive shape: a single upload manifest at the root of
|
||||
// request_body with no JSON-RPC envelope. Canvas uses camelCase
|
||||
// `mimeType`; we normalize to snake_case `mime_type` on emit so the
|
||||
// wire shape matches the message-parts arm. Kind is derived from the
|
||||
// mime prefix (image/* → "image", audio/* → "audio", video/* → "video",
|
||||
// anything else → "file") because the canvas upload row doesn't carry
|
||||
// an explicit discriminator. Returns nil if neither `uri` nor `file_id`
|
||||
// is present at the root (i.e. not a flat upload manifest).
|
||||
func extractAttachmentFromFlatUploadManifest(body map[string]interface{}) map[string]interface{} {
|
||||
uri, _ := body["uri"].(string)
|
||||
fileID, _ := body["file_id"].(string)
|
||||
if uri == "" && fileID == "" {
|
||||
return nil
|
||||
}
|
||||
mimeType, _ := body["mimeType"].(string)
|
||||
if mimeType == "" {
|
||||
// Defensive: future canvas versions might emit snake_case directly.
|
||||
mimeType, _ = body["mime_type"].(string)
|
||||
}
|
||||
name, _ := body["name"].(string)
|
||||
// Apply the same minimum-info rule as the message-parts arm: a
|
||||
// manifest with neither uri nor name is non-actionable; skip.
|
||||
if uri == "" && name == "" {
|
||||
return nil
|
||||
}
|
||||
att := map[string]interface{}{"kind": kindFromMimeType(mimeType)}
|
||||
if uri != "" {
|
||||
att["uri"] = uri
|
||||
}
|
||||
if mimeType != "" {
|
||||
att["mime_type"] = mimeType
|
||||
}
|
||||
if name != "" {
|
||||
att["name"] = name
|
||||
}
|
||||
return att
|
||||
}
|
||||
|
||||
// kindFromMimeType derives the attachment `kind` discriminator from a
|
||||
// MIME type. Used by the flat-upload-manifest arm where the source row
|
||||
// has no explicit kind field.
|
||||
func kindFromMimeType(mime string) string {
|
||||
switch {
|
||||
case strings.HasPrefix(mime, "image/"):
|
||||
return "image"
|
||||
case strings.HasPrefix(mime, "audio/"):
|
||||
return "audio"
|
||||
case strings.HasPrefix(mime, "video/"):
|
||||
return "video"
|
||||
default:
|
||||
return "file"
|
||||
}
|
||||
}
|
||||
|
||||
// includeFlagSet returns true iff `flag` appears in the comma-separated
|
||||
// `?include=` query value. Whitespace around entries is tolerated.
|
||||
// Empty `include` returns false (existing back-compat shape).
|
||||
//
|
||||
// The comma-separable form lets future fields ("attachments_only",
|
||||
// "tool_trace_expanded", etc.) slot in without further URL-param creep.
|
||||
func includeFlagSet(includeQuery, flag string) bool {
|
||||
if includeQuery == "" || flag == "" {
|
||||
return false
|
||||
}
|
||||
for _, raw := range strings.Split(includeQuery, ",") {
|
||||
if strings.TrimSpace(raw) == flag {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=&include=
|
||||
//
|
||||
// The `include` query param is comma-separable; today the only flag is
|
||||
// `peer_info`, which enriches a2a_receive rows with `peer_name`,
|
||||
// `peer_role`, `agent_card_url`, and an `attachments[]` projection (see
|
||||
// extractAttachmentsFromRequestBody). It's additive + opt-in — existing
|
||||
// callers that don't pass `?include=peer_info` see the unchanged shape.
|
||||
// Surface for the layered enrichment that lets Claude Code channel
|
||||
// pushes carry full sender identity instead of bare UUIDs (sibling
|
||||
// repos: molecule-ai-workspace-runtime + molecule-mcp-claude-channel).
|
||||
//
|
||||
// since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'.
|
||||
// Optional, additive — callers that don't pass it get today's behavior (the
|
||||
@@ -102,6 +308,8 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
sinceSecsStr := c.Query("since_secs")
|
||||
sinceID := c.Query("since_id")
|
||||
beforeTSStr := c.Query("before_ts") // optional RFC3339 — return rows strictly older than this timestamp
|
||||
include := c.Query("include") // comma-separated; today's only flag is "peer_info"
|
||||
includePeerInfo := includeFlagSet(include, "peer_info")
|
||||
|
||||
// Validate peer_id as a UUID at the trust boundary so a malformed
|
||||
// caller (the agent or a downstream MCP tool) can't smuggle SQL
|
||||
@@ -192,22 +400,60 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
usingCursor = true
|
||||
}
|
||||
|
||||
// Build query with optional filters
|
||||
query := `SELECT id, workspace_id, activity_type, source_id, target_id, method,
|
||||
summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at
|
||||
FROM activity_logs WHERE workspace_id = $1`
|
||||
// Build query with optional filters. When ?include=peer_info is set,
|
||||
// LEFT JOIN workspaces ON activity_logs.source_id = w.id so we can
|
||||
// surface w.name + w.role on the row. LEFT (not INNER) is required
|
||||
// for two reasons:
|
||||
// 1. Canvas rows have source_id IS NULL — those must still appear
|
||||
// in the result set (with NULL peer_name/peer_role).
|
||||
// 2. A peer workspace may have been deleted since the row was
|
||||
// written (no FK constraint on activity_logs.source_id) —
|
||||
// LEFT JOIN preserves the activity row with NULL peer fields
|
||||
// rather than silently dropping the row.
|
||||
//
|
||||
// agent_card_url is NOT pulled from the workspaces table; it's
|
||||
// computed server-side from externalPlatformURL + source_id at
|
||||
// projection time (mirrors molecule-ai-workspace-runtime
|
||||
// a2a_client._agent_card_url_for which constructs
|
||||
// {PLATFORM_URL}/registry/discover/{peer_id}).
|
||||
//
|
||||
// Column qualification (`activity_logs.<col>`) is added ONLY when
|
||||
// the JOIN is present — disambiguates `id` / `created_at` which
|
||||
// exist in both tables. When the JOIN is absent, unqualified
|
||||
// column references preserve the exact wire-shape existing callers
|
||||
// + existing test fixtures expect (back-compat).
|
||||
actCol := ""
|
||||
if includePeerInfo {
|
||||
actCol = "activity_logs."
|
||||
}
|
||||
selectClause := `SELECT ` + actCol + `id, ` + actCol + `workspace_id, ` + actCol + `activity_type, ` +
|
||||
actCol + `source_id, ` + actCol + `target_id, ` + actCol + `method, ` +
|
||||
actCol + `summary, ` + actCol + `request_body, ` + actCol + `response_body, ` +
|
||||
actCol + `tool_trace, ` + actCol + `duration_ms, ` + actCol + `status, ` +
|
||||
actCol + `error_detail, ` + actCol + `created_at`
|
||||
fromClause := ` FROM activity_logs`
|
||||
if includePeerInfo {
|
||||
selectClause += `, w.name AS peer_name, w.role AS peer_role`
|
||||
fromClause += ` LEFT JOIN workspaces w ON w.id = activity_logs.source_id`
|
||||
}
|
||||
query := selectClause + fromClause + ` WHERE ` + actCol + `workspace_id = $1`
|
||||
args := []interface{}{workspaceID}
|
||||
argIdx := 2
|
||||
|
||||
// WHERE/ORDER column refs use the same `actCol` qualifier prefix
|
||||
// computed above — empty string when no JOIN (back-compat with
|
||||
// existing wire shape + sqlmock-regex test fixtures), or
|
||||
// `activity_logs.` when LEFT JOIN'd (disambiguates `id` /
|
||||
// `created_at` between the two tables).
|
||||
if activityType != "" {
|
||||
query += fmt.Sprintf(" AND activity_type = $%d", argIdx)
|
||||
query += fmt.Sprintf(" AND "+actCol+"activity_type = $%d", argIdx)
|
||||
args = append(args, activityType)
|
||||
argIdx++
|
||||
}
|
||||
if source == "canvas" {
|
||||
query += " AND source_id IS NULL"
|
||||
query += " AND " + actCol + "source_id IS NULL"
|
||||
} else if source == "agent" {
|
||||
query += " AND source_id IS NOT NULL"
|
||||
query += " AND " + actCol + "source_id IS NOT NULL"
|
||||
} else if source != "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "source must be 'canvas' or 'agent'"})
|
||||
return
|
||||
@@ -224,7 +470,7 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// and avoids duplicate parameter binding (some drivers reject the
|
||||
// same arg slot reused, ours is fine but the explicit form is
|
||||
// clearer to read and matches the rest of the builder.)
|
||||
query += fmt.Sprintf(" AND (source_id = $%d OR target_id = $%d)", argIdx, argIdx)
|
||||
query += fmt.Sprintf(" AND ("+actCol+"source_id = $%d OR "+actCol+"target_id = $%d)", argIdx, argIdx)
|
||||
args = append(args, peerID)
|
||||
argIdx++
|
||||
}
|
||||
@@ -232,7 +478,7 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// Strictly older — never replay a row with the exact same
|
||||
// timestamp, mirrors the `created_at > cursorTime` shape
|
||||
// `since_id` uses for forward paging.
|
||||
query += fmt.Sprintf(" AND created_at < $%d", argIdx)
|
||||
query += fmt.Sprintf(" AND "+actCol+"created_at < $%d", argIdx)
|
||||
args = append(args, beforeTS)
|
||||
argIdx++
|
||||
}
|
||||
@@ -241,13 +487,13 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// interpolated into the SQL string. `make_interval(secs => $N)`
|
||||
// avoids the lib/pq quirk where INTERVAL '$N seconds' won't
|
||||
// substitute a placeholder inside the literal.
|
||||
query += fmt.Sprintf(" AND created_at >= NOW() - make_interval(secs => $%d)", argIdx)
|
||||
query += fmt.Sprintf(" AND "+actCol+"created_at >= NOW() - make_interval(secs => $%d)", argIdx)
|
||||
args = append(args, sinceSecs)
|
||||
argIdx++
|
||||
}
|
||||
if usingCursor {
|
||||
// Strictly after — never replay the cursor row itself.
|
||||
query += fmt.Sprintf(" AND created_at > $%d", argIdx)
|
||||
query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx)
|
||||
args = append(args, cursorTime)
|
||||
argIdx++
|
||||
}
|
||||
@@ -257,9 +503,9 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// since_id) keeps DESC — that's the canvas/UI shape and changing it
|
||||
// would surprise existing callers.
|
||||
if usingCursor {
|
||||
query += fmt.Sprintf(" ORDER BY created_at ASC LIMIT $%d", argIdx)
|
||||
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx)
|
||||
} else {
|
||||
query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx)
|
||||
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx)
|
||||
}
|
||||
args = append(args, limit)
|
||||
|
||||
@@ -272,6 +518,14 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// agent_card_url base computed once per request so we don't pay the
|
||||
// header-read cost per row. Only meaningful when includePeerInfo is
|
||||
// set; the empty string here is harmless when the flag is off.
|
||||
var platformBase string
|
||||
if includePeerInfo {
|
||||
platformBase = externalPlatformURL(c)
|
||||
}
|
||||
|
||||
activities := make([]map[string]interface{}, 0)
|
||||
for rows.Next() {
|
||||
var id, wsID, actType, status string
|
||||
@@ -279,10 +533,23 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
var reqBody, respBody, toolTrace []byte
|
||||
var durationMs *int
|
||||
var createdAt time.Time
|
||||
// LEFT JOIN'd peer columns — pointer-string so a NULL row
|
||||
// (canvas message OR deleted peer workspace) decodes as nil
|
||||
// rather than empty-string. Only scanned when includePeerInfo
|
||||
// is set (matched against the SELECT clause above).
|
||||
var peerName, peerRole *string
|
||||
|
||||
if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
|
||||
&summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil {
|
||||
log.Printf("Activity scan error: %v", err)
|
||||
var scanErr error
|
||||
if includePeerInfo {
|
||||
scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
|
||||
&summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt,
|
||||
&peerName, &peerRole)
|
||||
} else {
|
||||
scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
|
||||
&summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt)
|
||||
}
|
||||
if scanErr != nil {
|
||||
log.Printf("Activity scan error: %v", scanErr)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -308,6 +575,39 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
if toolTrace != nil {
|
||||
entry["tool_trace"] = json.RawMessage(toolTrace)
|
||||
}
|
||||
|
||||
// peer_info enrichment (per ?include=peer_info). Only emit the
|
||||
// new fields when the flag is set — back-compat for callers
|
||||
// that don't request it.
|
||||
if includePeerInfo {
|
||||
// peer_name / peer_role: emit only when present (canvas
|
||||
// rows have source_id IS NULL → peer_name is NULL by JOIN;
|
||||
// also a peer workspace may have been deleted since the
|
||||
// row was written → same NULL outcome). Omit-when-absent
|
||||
// matches the Layer 3 adaptor's "spread when present"
|
||||
// pattern; canvas_user rows legitimately have no peer_*.
|
||||
if peerName != nil && *peerName != "" {
|
||||
entry["peer_name"] = *peerName
|
||||
}
|
||||
if peerRole != nil && *peerRole != "" {
|
||||
entry["peer_role"] = *peerRole
|
||||
}
|
||||
// agent_card_url: constructed server-side from
|
||||
// externalPlatformURL + source_id. Mirrors the runtime-
|
||||
// side helper a2a_client._agent_card_url_for which builds
|
||||
// {PLATFORM_URL}/registry/discover/{peer_id}. Only set
|
||||
// when source_id is present + non-empty.
|
||||
if sourceID != nil && *sourceID != "" && platformBase != "" {
|
||||
entry["agent_card_url"] = platformBase + "/registry/discover/" + *sourceID
|
||||
}
|
||||
// attachments: flatten file/image/audio parts from the
|
||||
// request_body. nil when none — only project when
|
||||
// non-empty so the omit-when-absent rule holds.
|
||||
if atts := extractAttachmentsFromRequestBody(reqBody); len(atts) > 0 {
|
||||
entry["attachments"] = atts
|
||||
}
|
||||
}
|
||||
|
||||
activities = append(activities, entry)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
|
||||
@@ -0,0 +1,701 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Tests for the `?include=peer_info` activity-feed enrichment.
|
||||
//
|
||||
// The enrichment is additive + opt-in. When the flag is absent, the
|
||||
// existing tests (TestActivityList_SourceCanvas, etc.) prove the wire
|
||||
// shape is unchanged. These tests prove:
|
||||
// - When the flag IS set, the LEFT JOIN is issued and the SELECT
|
||||
// adds w.name + w.role.
|
||||
// - peer_name / peer_role surface from the joined row.
|
||||
// - agent_card_url is composed server-side from
|
||||
// externalPlatformURL + source_id and appears for non-canvas rows
|
||||
// (source_id present).
|
||||
// - attachments[] is projected from request_body.params.message.parts
|
||||
// for file/image/audio parts.
|
||||
// - Canvas rows (source_id NULL) do NOT get peer_name / peer_role /
|
||||
// agent_card_url, but DO still appear in the result set (LEFT JOIN
|
||||
// preserves them with NULL peer fields).
|
||||
// - The `include` query param is comma-separable and only recognizes
|
||||
// known flags.
|
||||
|
||||
// ---------- includeFlagSet helper unit tests ----------
|
||||
|
||||
func TestIncludeFlagSet(t *testing.T) {
|
||||
cases := []struct {
|
||||
query string
|
||||
flag string
|
||||
want bool
|
||||
}{
|
||||
{"", "peer_info", false},
|
||||
{"peer_info", "peer_info", true},
|
||||
{"peer_info,attachments", "peer_info", true},
|
||||
{"attachments,peer_info", "peer_info", true},
|
||||
{"attachments , peer_info ", "peer_info", true},
|
||||
{"peer_infos", "peer_info", false},
|
||||
{"peerinfo", "peer_info", false},
|
||||
{"peer_info", "", false},
|
||||
{",,", "peer_info", false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
got := includeFlagSet(tc.query, tc.flag)
|
||||
if got != tc.want {
|
||||
t.Errorf("includeFlagSet(%q, %q) = %v, want %v", tc.query, tc.flag, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- extractAttachmentsFromRequestBody unit tests ----------
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_Empty(t *testing.T) {
|
||||
if got := extractAttachmentsFromRequestBody(nil); got != nil {
|
||||
t.Errorf("nil body: want nil, got %v", got)
|
||||
}
|
||||
if got := extractAttachmentsFromRequestBody([]byte("")); got != nil {
|
||||
t.Errorf("empty body: want nil, got %v", got)
|
||||
}
|
||||
if got := extractAttachmentsFromRequestBody([]byte("not json")); got != nil {
|
||||
t.Errorf("non-json body: want nil, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_NoAttachments(t *testing.T) {
|
||||
// Text-only message: no file/image/audio parts → nil
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`)
|
||||
if got := extractAttachmentsFromRequestBody(body); got != nil {
|
||||
t.Errorf("text-only: want nil, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FileKindV1(t *testing.T) {
|
||||
// a2a-sdk v1 shape: kind=file, file:{uri,mime_type,name}
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
{"kind":"text","text":"see attached"},
|
||||
{"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}
|
||||
]}}}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["kind"] != "file" {
|
||||
t.Errorf("kind: want file, got %v", atts[0]["kind"])
|
||||
}
|
||||
if atts[0]["uri"] != "workspace:foo.pdf" {
|
||||
t.Errorf("uri mismatch: %v", atts[0]["uri"])
|
||||
}
|
||||
if atts[0]["mime_type"] != "application/pdf" {
|
||||
t.Errorf("mime_type mismatch: %v", atts[0]["mime_type"])
|
||||
}
|
||||
if atts[0]["name"] != "foo.pdf" {
|
||||
t.Errorf("name mismatch: %v", atts[0]["name"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) {
|
||||
// Mixed image + audio parts; both surface
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
{"kind":"image","file":{"uri":"workspace:a.png","mime_type":"image/png","name":"a.png"}},
|
||||
{"kind":"audio","file":{"uri":"workspace:b.mp3","mime_type":"audio/mpeg","name":"b.mp3"}}
|
||||
]}}}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 2 {
|
||||
t.Fatalf("want 2 attachments, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["kind"] != "image" || atts[1]["kind"] != "audio" {
|
||||
t.Errorf("kind order: got %v / %v", atts[0]["kind"], atts[1]["kind"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) {
|
||||
// Legacy v0 shape: type=file (not kind), inlined fields (no nested .file)
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
{"type":"file","uri":"workspace:legacy.txt","mime_type":"text/plain","name":"legacy.txt"}
|
||||
]}}}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["kind"] != "file" || atts[0]["uri"] != "workspace:legacy.txt" || atts[0]["name"] != "legacy.txt" {
|
||||
t.Errorf("v0 part not surfaced: %v", atts[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_SkipsEmptyParts(t *testing.T) {
|
||||
// A "file" part with no uri AND no name is malformed — skip rather
|
||||
// than emit a no-info entry.
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
{"kind":"file","file":{}},
|
||||
{"kind":"file","file":{"name":"only-name.bin"}}
|
||||
]}}}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment (the named one), got %d", len(atts))
|
||||
}
|
||||
if atts[0]["name"] != "only-name.bin" {
|
||||
t.Errorf("expected only-name.bin, got %v", atts[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_MalformedShape(t *testing.T) {
|
||||
// Various malformed shapes return nil (defensive)
|
||||
for _, b := range []string{
|
||||
`{}`,
|
||||
`{"params":{}}`,
|
||||
`{"params":{"message":{}}}`,
|
||||
`{"params":{"message":{"parts":"not-a-list"}}}`,
|
||||
`{"params":{"message":{"parts":[null,42,"string"]}}}`,
|
||||
} {
|
||||
if got := extractAttachmentsFromRequestBody([]byte(b)); got != nil {
|
||||
t.Errorf("body %q: want nil, got %v", b, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Activity List ?include=peer_info handler tests ----------
|
||||
|
||||
func TestActivityList_IncludePeerInfo_IssuesLeftJoin(t *testing.T) {
|
||||
// When ?include=peer_info is set, the query must:
|
||||
// 1. SELECT include w.name + w.role aliased as peer_name/peer_role
|
||||
// 2. FROM contains LEFT JOIN workspaces w ON w.id = activity_logs.source_id
|
||||
// 3. WHERE uses qualified activity_logs.workspace_id (disambiguates
|
||||
// from workspaces.id post-JOIN)
|
||||
//
|
||||
// Pin all three so a future refactor can't silently drop the JOIN or
|
||||
// the alias and have the test still pass.
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
peerID := "11111111-2222-3333-4444-555555555555"
|
||||
mock.ExpectQuery(
|
||||
`SELECT .+w\.name AS peer_name, w\.role AS peer_role FROM activity_logs LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id WHERE activity_logs\.workspace_id = .+`,
|
||||
).
|
||||
WithArgs("ws-1", 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
"peer_name", "peer_role",
|
||||
}).
|
||||
AddRow("act-1", "ws-1", "a2a_receive", peerID, "ws-1",
|
||||
"message/send", "Agent message: hello",
|
||||
[]byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hello"}]}}}`),
|
||||
nil, nil, nil, "ok", nil, time.Now(),
|
||||
"Production Manager", "product manager"))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
|
||||
c.Request.Host = "platform.test"
|
||||
c.Request.Header.Set("X-Forwarded-Proto", "https")
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(resp) != 1 {
|
||||
t.Fatalf("want 1 row, got %d", len(resp))
|
||||
}
|
||||
r := resp[0]
|
||||
if r["peer_name"] != "Production Manager" {
|
||||
t.Errorf("peer_name: got %v", r["peer_name"])
|
||||
}
|
||||
if r["peer_role"] != "product manager" {
|
||||
t.Errorf("peer_role: got %v", r["peer_role"])
|
||||
}
|
||||
wantURL := "https://platform.test/registry/discover/" + peerID
|
||||
if r["agent_card_url"] != wantURL {
|
||||
t.Errorf("agent_card_url: got %v, want %v", r["agent_card_url"], wantURL)
|
||||
}
|
||||
// Text-only message has no attachments → omit from envelope
|
||||
if _, present := r["attachments"]; present {
|
||||
t.Errorf("attachments should be omitted on text-only row; got %v", r["attachments"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_IncludePeerInfo_CanvasRowHasNoPeerFields(t *testing.T) {
|
||||
// LEFT JOIN preserves canvas rows (source_id NULL) but their
|
||||
// peer_name/peer_role come back as NULL — must omit from the
|
||||
// envelope (not emit empty strings or null literals).
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery(
|
||||
`LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id`,
|
||||
).
|
||||
WithArgs("ws-1", 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
"peer_name", "peer_role",
|
||||
}).
|
||||
// source_id NULL = canvas message; peer columns also NULL.
|
||||
AddRow("act-canvas", "ws-1", "a2a_receive", nil, "ws-1",
|
||||
"notify", "User said hi",
|
||||
[]byte(`{"params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`),
|
||||
nil, nil, nil, "ok", nil, time.Now(),
|
||||
nil, nil))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(resp) != 1 {
|
||||
t.Fatalf("want 1 row, got %d", len(resp))
|
||||
}
|
||||
r := resp[0]
|
||||
for _, k := range []string{"peer_name", "peer_role", "agent_card_url"} {
|
||||
if _, present := r[k]; present {
|
||||
t.Errorf("%s should be absent on canvas row; got %v", k, r[k])
|
||||
}
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_IncludePeerInfo_AttachmentsSurfaceFromRequestBody(t *testing.T) {
|
||||
// A peer_agent message with an inline file attachment must have
|
||||
// attachments[] populated on the envelope.
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
peerID := "11111111-2222-3333-4444-555555555555"
|
||||
mock.ExpectQuery(`LEFT JOIN workspaces`).
|
||||
WithArgs("ws-1", 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
"peer_name", "peer_role",
|
||||
}).
|
||||
AddRow("act-with-file", "ws-1", "a2a_receive", peerID, "ws-1",
|
||||
"message/send", "Agent message: see attached",
|
||||
[]byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
{"kind":"text","text":"see attached"},
|
||||
{"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}
|
||||
]}}}`),
|
||||
nil, nil, nil, "ok", nil, time.Now(),
|
||||
"Code Reviewer", "code reviewer"))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
r := resp[0]
|
||||
atts, ok := r["attachments"].([]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("attachments missing or wrong type: %T %v", r["attachments"], r["attachments"])
|
||||
}
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d: %v", len(atts), atts)
|
||||
}
|
||||
att := atts[0].(map[string]interface{})
|
||||
if att["kind"] != "file" || att["uri"] != "workspace:foo.pdf" || att["name"] != "foo.pdf" {
|
||||
t.Errorf("attachment shape: %v", att)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_IncludePeerInfo_Unset_NoJoinNoExtraFields(t *testing.T) {
|
||||
// Back-compat — when ?include=peer_info is NOT passed, the SELECT
|
||||
// uses unqualified column refs (no `activity_logs.` prefix) AND no
|
||||
// JOIN. Existing tests pass this implicitly; this test pins it
|
||||
// explicitly so a future refactor that accidentally turns the JOIN
|
||||
// always-on gets caught.
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
// Regex pinned: "FROM activity_logs WHERE workspace_id" — no JOIN
|
||||
// keyword between FROM and WHERE; no `activity_logs.` qualifier on
|
||||
// workspace_id.
|
||||
mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`).
|
||||
WithArgs("ws-1", 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
}).
|
||||
AddRow("act-1", "ws-1", "a2a_receive", "11111111-2222-3333-4444-555555555555", "ws-1",
|
||||
"message/send", "Hello",
|
||||
nil, nil, nil, nil, "ok", nil, time.Now()))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity", nil)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(resp) != 1 {
|
||||
t.Fatalf("want 1 row, got %d", len(resp))
|
||||
}
|
||||
// Confirm no peer_info enrichment leaks into the default envelope.
|
||||
for _, k := range []string{"peer_name", "peer_role", "agent_card_url", "attachments"} {
|
||||
if _, present := resp[0][k]; present {
|
||||
t.Errorf("%s must NOT appear without ?include=peer_info; got %v", k, resp[0][k])
|
||||
}
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_IncludePeerInfo_UnknownFlagIgnored(t *testing.T) {
|
||||
// ?include=bogus must NOT issue the JOIN — only the recognized
|
||||
// `peer_info` flag triggers enrichment. The unknown flag is silently
|
||||
// ignored (additive, opt-in convention).
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`).
|
||||
WithArgs("ws-1", 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
}))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=bogus", nil)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- flat upload manifest (chat_upload_receive) tests ----------
|
||||
|
||||
func TestKindFromMimeType(t *testing.T) {
|
||||
cases := []struct {
|
||||
mime string
|
||||
want string
|
||||
}{
|
||||
{"image/png", "image"},
|
||||
{"image/jpeg", "image"},
|
||||
{"image/", "image"}, // prefix-only is still image
|
||||
{"audio/mpeg", "audio"},
|
||||
{"audio/wav", "audio"},
|
||||
{"video/mp4", "video"},
|
||||
{"video/webm", "video"},
|
||||
{"application/pdf", "file"},
|
||||
{"text/plain", "file"},
|
||||
{"", "file"},
|
||||
{"unknown", "file"},
|
||||
{"image", "file"}, // no slash → not a prefix match
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := kindFromMimeType(tc.mime); got != tc.want {
|
||||
t.Errorf("kindFromMimeType(%q) = %q, want %q", tc.mime, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_Image(t *testing.T) {
|
||||
// Canvas chat_upload_receive shape: flat manifest at request_body
|
||||
// root with camelCase mimeType. The empirical example was a PNG
|
||||
// pasted into the canvas; surfaces here with kind=image,
|
||||
// mime_type=image/png (snake-case normalized), uri preserved.
|
||||
body := []byte(`{
|
||||
"uri":"platform-pending:091a9180-/26111d48-",
|
||||
"name":"pasted-2026-05-21T23-12-25-0-0.png",
|
||||
"size":677133,
|
||||
"file_id":"26111d48-",
|
||||
"mimeType":"image/png"
|
||||
}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d: %v", len(atts), atts)
|
||||
}
|
||||
att := atts[0]
|
||||
if att["kind"] != "image" {
|
||||
t.Errorf("kind: want image, got %v", att["kind"])
|
||||
}
|
||||
if att["uri"] != "platform-pending:091a9180-/26111d48-" {
|
||||
t.Errorf("uri: %v", att["uri"])
|
||||
}
|
||||
if att["mime_type"] != "image/png" {
|
||||
t.Errorf("mime_type normalization (camelCase→snake_case) failed: %v", att["mime_type"])
|
||||
}
|
||||
if att["name"] != "pasted-2026-05-21T23-12-25-0-0.png" {
|
||||
t.Errorf("name: %v", att["name"])
|
||||
}
|
||||
// camelCase `mimeType` MUST NOT leak into the projected envelope —
|
||||
// only snake_case `mime_type` is the wire convention.
|
||||
if _, present := att["mimeType"]; present {
|
||||
t.Errorf("camelCase mimeType leaked into envelope: %v", att)
|
||||
}
|
||||
if _, present := att["file_id"]; present {
|
||||
t.Errorf("file_id should not be surfaced on the attachment envelope (it's a canvas-internal id): %v", att)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_Audio(t *testing.T) {
|
||||
body := []byte(`{"uri":"platform-pending:ws/file","name":"voice.mp3","file_id":"abc","mimeType":"audio/mpeg"}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 || atts[0]["kind"] != "audio" {
|
||||
t.Fatalf("want audio kind, got %v", atts)
|
||||
}
|
||||
if atts[0]["mime_type"] != "audio/mpeg" {
|
||||
t.Errorf("mime_type: %v", atts[0]["mime_type"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_Video(t *testing.T) {
|
||||
body := []byte(`{"uri":"platform-pending:ws/file","name":"clip.mp4","file_id":"abc","mimeType":"video/mp4"}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 || atts[0]["kind"] != "video" {
|
||||
t.Fatalf("want video kind, got %v", atts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_GenericFile(t *testing.T) {
|
||||
// application/pdf has no image/audio/video prefix → kind=file
|
||||
body := []byte(`{"uri":"platform-pending:ws/file","name":"doc.pdf","file_id":"abc","mimeType":"application/pdf"}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 || atts[0]["kind"] != "file" {
|
||||
t.Fatalf("want file kind, got %v", atts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_NoMimeFallsToFile(t *testing.T) {
|
||||
// No mimeType at all — kind defaults to "file", mime_type omitted.
|
||||
body := []byte(`{"uri":"platform-pending:ws/file","name":"unknown.bin","file_id":"abc"}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["kind"] != "file" {
|
||||
t.Errorf("kind: want file (default), got %v", atts[0]["kind"])
|
||||
}
|
||||
if _, present := atts[0]["mime_type"]; present {
|
||||
t.Errorf("mime_type should be omitted when source has none, got %v", atts[0]["mime_type"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_SnakeCaseMimeTypeAccepted(t *testing.T) {
|
||||
// Defensive: a future canvas version (or non-canvas caller) that
|
||||
// already emits snake_case mime_type should still be parsed.
|
||||
body := []byte(`{"uri":"u","name":"n.png","mime_type":"image/png"}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["mime_type"] != "image/png" || atts[0]["kind"] != "image" {
|
||||
t.Errorf("snake_case mime_type not honored: %v", atts[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_FileIDOnlyIsSkipped(t *testing.T) {
|
||||
// file_id alone (no uri AND no name) is non-actionable — the
|
||||
// downstream adaptor can't render a discoverable file from just an
|
||||
// internal canvas id. Skip per the same minimum-info rule the
|
||||
// message-parts arm applies to empty parts.
|
||||
body := []byte(`{"file_id":"orphan-uuid","mimeType":"image/png"}`)
|
||||
if got := extractAttachmentsFromRequestBody(body); got != nil {
|
||||
t.Errorf("file_id-only manifest must be skipped, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_FlatUpload_NameOnlyIsKept(t *testing.T) {
|
||||
// Symmetric with the message-parts arm: a name without uri is still
|
||||
// useful (the downstream adaptor can render "user uploaded foo.png").
|
||||
body := []byte(`{"name":"only-name.bin","file_id":"abc","mimeType":"application/octet-stream"}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["name"] != "only-name.bin" {
|
||||
t.Errorf("name not preserved: %v", atts[0])
|
||||
}
|
||||
if _, present := atts[0]["uri"]; present {
|
||||
t.Errorf("uri should be omitted when absent in source, got %v", atts[0]["uri"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_MessagePartsTakesPrecedenceOverFlat(t *testing.T) {
|
||||
// If a single request_body somehow has BOTH params.message.parts[]
|
||||
// AND top-level uri/file_id (a pathological inbound), the
|
||||
// message-parts arm wins — that's the documented inbound shape and
|
||||
// it's been the only one historically extracted. The flat arm is a
|
||||
// fallback for shapes that have NO parts.
|
||||
body := []byte(`{
|
||||
"uri":"platform-pending:should-not-win",
|
||||
"file_id":"x",
|
||||
"mimeType":"image/png",
|
||||
"params":{"message":{"parts":[
|
||||
{"kind":"file","file":{"uri":"workspace:should-win.pdf","mime_type":"application/pdf","name":"win.pdf"}}
|
||||
]}}
|
||||
}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment (from parts[]), got %d: %v", len(atts), atts)
|
||||
}
|
||||
if atts[0]["uri"] != "workspace:should-win.pdf" {
|
||||
t.Errorf("message-parts arm did not take precedence: %v", atts[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_IncludePeerInfo_ChatUploadReceiveCanvasRow(t *testing.T) {
|
||||
// Wire-level integration: a canvas chat_upload_receive row (canvas
|
||||
// user pasted an image) with source_id NULL (canvas message), flat
|
||||
// upload manifest at request_body root. The `?include=peer_info`
|
||||
// projection must surface attachments[] populated from the flat-
|
||||
// upload-manifest arm while peer_name / peer_role / agent_card_url
|
||||
// remain absent (canvas row has no peer).
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery(`LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id`).
|
||||
WithArgs("ws-1", 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
"peer_name", "peer_role",
|
||||
}).
|
||||
// Empirical shape from 2026-05-21 ~23:12Z agents-team canvas paste.
|
||||
AddRow("act-upload", "ws-1", "chat_upload_receive", nil, "ws-1",
|
||||
"chat_upload_receive", "Canvas upload: pasted-2026-05-21T23-12-25-0-0.png",
|
||||
[]byte(`{
|
||||
"uri":"platform-pending:091a9180-b303-4a20-aefe-3a4a675b8aa4/26111d48-aaaa-bbbb-cccc-dddddddddddd",
|
||||
"name":"pasted-2026-05-21T23-12-25-0-0.png",
|
||||
"size":677133,
|
||||
"file_id":"26111d48-aaaa-bbbb-cccc-dddddddddddd",
|
||||
"mimeType":"image/png"
|
||||
}`),
|
||||
nil, nil, nil, "ok", nil, time.Now(),
|
||||
nil, nil))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(resp) != 1 {
|
||||
t.Fatalf("want 1 row, got %d", len(resp))
|
||||
}
|
||||
r := resp[0]
|
||||
// Canvas row → no peer fields.
|
||||
for _, k := range []string{"peer_name", "peer_role", "agent_card_url"} {
|
||||
if _, present := r[k]; present {
|
||||
t.Errorf("%s must NOT appear on canvas upload row; got %v", k, r[k])
|
||||
}
|
||||
}
|
||||
// attachments[] populated from the flat-upload arm.
|
||||
atts, ok := r["attachments"].([]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("attachments missing or wrong type: %T %v", r["attachments"], r["attachments"])
|
||||
}
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment from flat manifest, got %d: %v", len(atts), atts)
|
||||
}
|
||||
att := atts[0].(map[string]interface{})
|
||||
if att["kind"] != "image" {
|
||||
t.Errorf("kind: want image (image/png prefix), got %v", att["kind"])
|
||||
}
|
||||
if att["mime_type"] != "image/png" {
|
||||
t.Errorf("mime_type wire shape: want snake_case image/png, got %v", att["mime_type"])
|
||||
}
|
||||
if att["uri"] != "platform-pending:091a9180-b303-4a20-aefe-3a4a675b8aa4/26111d48-aaaa-bbbb-cccc-dddddddddddd" {
|
||||
t.Errorf("uri preserved verbatim: got %v", att["uri"])
|
||||
}
|
||||
if att["name"] != "pasted-2026-05-21T23-12-25-0-0.png" {
|
||||
t.Errorf("name: %v", att["name"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity test using the existing test broadcaster setup — verifies the
|
||||
// extractAttachments helper round-trips through json.Marshal cleanly
|
||||
// (no map ordering issues, no type-coercion surprises).
|
||||
func TestExtractAttachmentsFromRequestBody_RoundTripsThroughJSON(t *testing.T) {
|
||||
body := []byte(`{"params":{"message":{"parts":[{"kind":"file","file":{"uri":"workspace:r.bin","mime_type":"application/octet-stream","name":"r.bin"}}]}}}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
b, err := json.Marshal(atts)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
var decoded []map[string]interface{}
|
||||
if err := json.Unmarshal(b, &decoded); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if len(decoded) != 1 || decoded[0]["uri"] != "workspace:r.bin" {
|
||||
t.Fatalf("round-trip mismatch: %v", decoded)
|
||||
}
|
||||
_ = fmt.Sprintf // keep fmt import live if test trimming removes usage
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// AdminWorkspaceTokenHandler lets tenant admins mint the first workspace
|
||||
// bearer for managed SaaS workspaces whose runtime receives its token later
|
||||
// through registry registration.
|
||||
type AdminWorkspaceTokenHandler struct{}
|
||||
|
||||
func NewAdminWorkspaceTokenHandler() *AdminWorkspaceTokenHandler {
|
||||
return &AdminWorkspaceTokenHandler{}
|
||||
}
|
||||
|
||||
// Create handles POST /admin/workspaces/:id/tokens. The route must be mounted
|
||||
// behind AdminAuth; the plaintext token is returned exactly once.
|
||||
func (h *AdminWorkspaceTokenHandler) Create(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
if !validWorkspaceID(workspaceID) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
|
||||
return
|
||||
}
|
||||
|
||||
var existing string
|
||||
err := db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT id FROM workspaces WHERE id = $1 AND status <> 'removed'`,
|
||||
workspaceID).Scan(&existing)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
log.Printf("admin workspace tokens: workspace lookup failed for %s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "workspace lookup failed"})
|
||||
return
|
||||
}
|
||||
|
||||
var count int
|
||||
if err := db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT COUNT(*) FROM workspace_auth_tokens WHERE workspace_id = $1 AND revoked_at IS NULL`,
|
||||
workspaceID).Scan(&count); err != nil {
|
||||
log.Printf("admin workspace tokens: count failed for %s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count tokens"})
|
||||
return
|
||||
}
|
||||
if count >= maxTokensPerWorkspace {
|
||||
c.JSON(http.StatusTooManyRequests, gin.H{"error": fmt.Sprintf("maximum %d active tokens per workspace", maxTokensPerWorkspace)})
|
||||
return
|
||||
}
|
||||
|
||||
token, err := wsauth.IssueToken(c.Request.Context(), db.DB, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("admin workspace tokens: issue failed for %s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create token"})
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("admin workspace tokens: issued token for workspace %s", workspaceID)
|
||||
c.JSON(http.StatusCreated, gin.H{
|
||||
"auth_token": token,
|
||||
"workspace_id": workspaceID,
|
||||
"message": "Save this token now — it cannot be retrieved again.",
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestAdminWorkspaceTokenHandler_Create_HappyPath(t *testing.T) {
|
||||
mock, cleanup := withMockDB(t)
|
||||
defer cleanup()
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE id = \$1 AND status <> 'removed'`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsUUID1))
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec(`INSERT INTO workspace_auth_tokens`).
|
||||
WithArgs(wsUUID1, sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
w := makeReq(t, NewAdminWorkspaceTokenHandler().Create, "POST",
|
||||
"/admin/workspaces/"+wsUUID1+"/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var body struct {
|
||||
AuthToken string `json:"auth_token"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
if body.AuthToken == "" || body.WorkspaceID != wsUUID1 {
|
||||
t.Fatalf("unexpected body: %+v", body)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminWorkspaceTokenHandler_Create_MissingWorkspace(t *testing.T) {
|
||||
mock, cleanup := withMockDB(t)
|
||||
defer cleanup()
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE id = \$1 AND status <> 'removed'`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
w := makeReq(t, NewAdminWorkspaceTokenHandler().Create, "POST",
|
||||
"/admin/workspaces/"+wsUUID1+"/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminWorkspaceTokenHandler_Create_RateLimited(t *testing.T) {
|
||||
mock, cleanup := withMockDB(t)
|
||||
defer cleanup()
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE id = \$1 AND status <> 'removed'`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsUUID1))
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(maxTokensPerWorkspace))
|
||||
|
||||
w := makeReq(t, NewAdminWorkspaceTokenHandler().Create, "POST",
|
||||
"/admin/workspaces/"+wsUUID1+"/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
|
||||
|
||||
if w.Code != http.StatusTooManyRequests {
|
||||
t.Fatalf("expected 429, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminWorkspaceTokenHandler_Create_IssueFails(t *testing.T) {
|
||||
mock, cleanup := withMockDB(t)
|
||||
defer cleanup()
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces WHERE id = \$1 AND status <> 'removed'`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsUUID1))
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
|
||||
WithArgs(wsUUID1).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec(`INSERT INTO workspace_auth_tokens`).
|
||||
WillReturnError(errors.New("disk full"))
|
||||
|
||||
w := makeReq(t, NewAdminWorkspaceTokenHandler().Create, "POST",
|
||||
"/admin/workspaces/"+wsUUID1+"/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
@@ -216,69 +216,102 @@ curl -fsS -X POST "{{PLATFORM_URL}}/registry/register" \
|
||||
const externalChannelTemplate = `# Claude Code channel — bridges this workspace's A2A traffic into your
|
||||
# Claude Code session. No tunnel/public URL needed (polling-based).
|
||||
#
|
||||
# Prereq: Bun installed (channel plugins are Bun scripts).
|
||||
# bun --version # must print a version number
|
||||
# Prereq: Bun 1.3+ installed (channel plugins are Bun scripts).
|
||||
# bun --version # must print a version (1.3.x or newer)
|
||||
#
|
||||
# 1. Inside Claude Code, install the channel plugin from its GitHub repo.
|
||||
# The plugin is NOT on Anthropic's default allowlist, so a one-time
|
||||
# marketplace-add is needed before install:
|
||||
# 1. Inside Claude Code, install the channel plugin. The plugin lives in
|
||||
# Molecule's own Gitea marketplace (not Anthropic's default), so a
|
||||
# one-time marketplace-add is needed before install:
|
||||
#
|
||||
# /plugin marketplace add https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel.git
|
||||
# /plugin install molecule@molecule-channel
|
||||
#
|
||||
# Then either run /reload-plugins or restart Claude Code so the
|
||||
# plugin is registered.
|
||||
# Then /reload-plugins (or restart Claude Code) so the plugin is
|
||||
# registered.
|
||||
#
|
||||
# 2. Create the per-watched-workspace config file:
|
||||
# 2. Create (or extend) the per-host config file. The canonical SSOT
|
||||
# shape is MOLECULE_WORKSPACES_JSON — a JSON array of
|
||||
# {id, token, platform_url} objects. One plugin instance can watch
|
||||
# many workspaces across many tenants; append more objects to the
|
||||
# array (separate them with commas, NOT a newline):
|
||||
mkdir -p ~/.claude/channels/molecule
|
||||
cat > ~/.claude/channels/molecule/.env <<'EOF'
|
||||
MOLECULE_PLATFORM_URL={{PLATFORM_URL}}
|
||||
MOLECULE_WORKSPACE_IDS={{WORKSPACE_ID}}
|
||||
MOLECULE_WORKSPACE_TOKENS=<paste auth_token from create response>
|
||||
MOLECULE_WORKSPACES_JSON=[{"id":"{{WORKSPACE_ID}}","token":"<paste auth_token from create response>","platform_url":"{{PLATFORM_URL}}"}]
|
||||
EOF
|
||||
chmod 600 ~/.claude/channels/molecule/.env
|
||||
|
||||
# 3. Launch Claude Code with the channel enabled. Custom (non-Anthropic-
|
||||
# allowlisted) channels need the --dangerously-load-development-channels
|
||||
# flag to opt in — without it, you'll see "not on the approved channels
|
||||
# allowlist" on startup.
|
||||
claude --dangerously-load-development-channels \
|
||||
--channels plugin:molecule@molecule-channel
|
||||
# (Legacy single-platform shape — MOLECULE_PLATFORM_URL + comma-separated
|
||||
# MOLECULE_WORKSPACE_IDS + MOLECULE_WORKSPACE_TOKENS — is still supported
|
||||
# for back-compat but does NOT work across multiple tenant URLs. Use
|
||||
# MOLECULE_WORKSPACES_JSON above unless you have a specific reason.)
|
||||
|
||||
# 3. Launch Claude Code with the channel enabled. The channel spec is the
|
||||
# VALUE of --dangerously-load-development-channels — NOT a separate
|
||||
# --channels flag (that flag does not exist in current Claude Code;
|
||||
# passing it errors with "entries must be tagged: --channels").
|
||||
claude --dangerously-load-development-channels plugin:molecule@molecule-channel
|
||||
|
||||
# You should see on stderr:
|
||||
# molecule channel: connected — watching 1 workspace(s) at {{PLATFORM_URL}}
|
||||
# molecule channel: connected — watching N workspace(s) across M platform(s)
|
||||
# targets: <platform_url>: <workspace_id>
|
||||
#
|
||||
# Inbound A2A messages now surface as conversation turns. Claude's
|
||||
# replies route back via the reply_to_workspace MCP tool — no extra
|
||||
# wiring on your side.
|
||||
# Inbound A2A messages now surface as conversation turns (synthetic
|
||||
# <channel ...> tags). Claude's replies route back via the
|
||||
# reply_to_workspace / send_message_to_user MCP tools.
|
||||
#
|
||||
# Multi-workspace note: when watching more than one workspace, every
|
||||
# outbound tool call (send_message_to_user, reply_to_workspace,
|
||||
# delegate_task, list_peers) MUST pass _as_workspace=<id> so the plugin
|
||||
# knows which token to authenticate with. The host returns -32603 if you
|
||||
# forget — the synthetic <channel> tag's "watching_as" attribute tells
|
||||
# you which id to use.
|
||||
#
|
||||
# Common errors:
|
||||
# "plugin not installed" → Step 1 didn't run; run /plugin install
|
||||
# "plugin not installed" → Step 1 didn't run; run /plugin
|
||||
# marketplace add + /plugin install
|
||||
# inside Claude Code, then /reload-plugins.
|
||||
# "not on approved channels allowlist" → Add --dangerously-load-development-channels
|
||||
# to the launch command (Step 3).
|
||||
# "config-missing" → ~/.claude/channels/molecule/.env not
|
||||
# readable; re-run Step 2 and check chmod.
|
||||
# "entries must be tagged" → You passed --channels separately.
|
||||
# Put plugin:molecule@molecule-channel
|
||||
# directly after
|
||||
# --dangerously-load-development-channels.
|
||||
# "not on approved channels allowlist" → Org policy gating. See "managed
|
||||
# settings" note below.
|
||||
# "config-missing" → ~/.claude/channels/molecule/.env
|
||||
# not readable; re-run Step 2 and check
|
||||
# chmod 600.
|
||||
#
|
||||
# Team/Enterprise orgs: the --dangerously-load-development-channels flag is
|
||||
# blocked by managed settings. Your admin must set channelsEnabled=true and
|
||||
# add the plugin to allowedChannelPlugins in claude.ai admin settings.
|
||||
# Team/Enterprise plans: the channel allowlist is gated by org policy
|
||||
# AND must be written to the local managed-settings.json file on disk
|
||||
# (not the claude.ai web admin UI — there is no web toggle for this).
|
||||
# Path per OS:
|
||||
# macOS: /Library/Application Support/ClaudeCode/managed-settings.json
|
||||
# Linux: /etc/claude-code/managed-settings.json
|
||||
# Windows: C:\ProgramData\ClaudeCode\managed-settings.json
|
||||
# Set channelsEnabled: true and add
|
||||
# { "plugin": "molecule", "marketplace": "molecule-channel" }
|
||||
# to allowedChannelPlugins. Restart Claude Code after writing the file.
|
||||
# A user-level ~/.claude/settings.json does NOT work on Team/Enterprise
|
||||
# — this is the single most common reason a freshly-installed plugin
|
||||
# appears to do nothing.
|
||||
#
|
||||
# Multi-workspace: comma-separate IDs and tokens (same order). See
|
||||
# https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel for
|
||||
# pairing flow, push-mode upgrade, and v0.2 roadmap.
|
||||
# Pro/Max plans skip the channelsEnabled gate but still need the
|
||||
# allowedChannelPlugins entry in the managed-settings file.
|
||||
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/claude-code-channel-plugin
|
||||
# Full README: https://git.moleculesai.app/molecule-ai/molecule-mcp-claude-channel
|
||||
# Common errors:
|
||||
# • "plugin not installed" — run /plugin marketplace add then
|
||||
# /plugin install lines above; /reload-plugins or restart.
|
||||
# • "entries must be tagged: --channels" — the launch flag form
|
||||
# changed; use --dangerously-load-development-channels plugin:molecule@molecule-channel
|
||||
# (channel spec is the VALUE, not a separate --channels flag).
|
||||
# • "not on the approved channels allowlist" — custom channels need
|
||||
# --dangerously-load-development-channels; team/enterprise orgs
|
||||
# need admin to set channelsEnabled + allowedChannelPlugins.
|
||||
# allowedChannelPlugins in /Library/Application Support/ClaudeCode/managed-settings.json
|
||||
# (macOS) / equivalent on Linux+Windows. NOT a web setting.
|
||||
# • "Inbound messages not arriving" — stderr should show
|
||||
# "molecule channel: connected — watching N workspace(s)";
|
||||
# verify ~/.claude/channels/molecule/.env has PLATFORM_URL + token.
|
||||
# verify ~/.claude/channels/molecule/.env shape is MOLECULE_WORKSPACES_JSON.
|
||||
`
|
||||
|
||||
// externalUniversalMcpTemplate — runtime-agnostic standalone path.
|
||||
@@ -670,7 +703,15 @@ def heartbeat(client, url, ws, tok, start):
|
||||
r.raise_for_status()
|
||||
|
||||
def poll_inbound(client, url, ws, tok, since_id):
|
||||
params = {"since_secs": "30", "limit": "50"}
|
||||
# include=peer_info opts into Layer 1's row-level projection so each
|
||||
# polled activity carries peer_name, peer_role, agent_card_url, and
|
||||
# attachments[] inline (when source_id resolves to a peer / when the
|
||||
# message included a file). Pre-Layer-1 platforms ignore unknown query
|
||||
# params and return the bare row shape, so this is back-compat. Use
|
||||
# the extra fields in your reply logic — e.g. address the sender by
|
||||
# peer_name rather than UUID, or Read attached files via the workspace:
|
||||
# URIs in attachments[].
|
||||
params = {"since_secs": "30", "limit": "50", "include": "peer_info"}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok))
|
||||
@@ -737,10 +778,16 @@ python3 ~/.molecule-ai/kimi-{{MCP_SERVER_NAME}}/kimi_bridge.py
|
||||
# What the script does:
|
||||
# • Registers the workspace in poll mode (no public URL needed)
|
||||
# • Heartbeats every 20s to keep STATUS = online on the canvas
|
||||
# • Polls /workspaces/:id/activity every 5s for new canvas messages
|
||||
# • Polls /workspaces/:id/activity?include=peer_info every 5s — Layer 1
|
||||
# enrichment surfaces peer_name / peer_role / agent_card_url /
|
||||
# attachments[] inline on each polled row when applicable
|
||||
# • Echo-replies via POST /workspaces/:id/notify
|
||||
#
|
||||
# To change the reply logic, edit the send_reply() call inside the loop.
|
||||
# Each polled item has top-level peer_name / peer_role / agent_card_url
|
||||
# fields (peer_agent rows) and attachments[] (any kind) when Layer 1 is
|
||||
# enabled on the platform — use them to disambiguate senders and to Read
|
||||
# attached files via the workspace: URIs.
|
||||
# To send a one-off reply from another terminal:
|
||||
# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \
|
||||
# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-{{MCP_SERVER_NAME}}/env | grep TOKEN | cut -d= -f2)" \
|
||||
|
||||
@@ -118,3 +118,86 @@ func TestExternalTemplates_NoBrokenMoleculeAIGitHubURLs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestExternalChannelTemplate_LaunchFlagShape pins the Claude Code channel
|
||||
// snippet to the working launch invocation. The channel spec must be the
|
||||
// VALUE of --dangerously-load-development-channels, NOT a separate
|
||||
// --channels flag. The two-flag form (`--dangerously-load-development-channels
|
||||
// --channels plugin:molecule@...`) errors with "entries must be tagged:
|
||||
// --channels" on current Claude Code builds (2.1.143+) and silently no-ops
|
||||
// on older ones — either way, new users hit a wall on first launch.
|
||||
//
|
||||
// Empirical: hit by a session walking through this exact snippet 2026-05-21;
|
||||
// the broken form was copy-pasted from this template, ran, errored, and
|
||||
// confused the operator into believing the plugin install was broken when
|
||||
// the snippet itself was the bug.
|
||||
func TestExternalChannelTemplate_LaunchFlagShape(t *testing.T) {
|
||||
// The broken two-flag form. If this string ever appears in the
|
||||
// snippet again, the same onboarding pothole returns.
|
||||
bannedFormBroken := "--dangerously-load-development-channels \\\n --channels plugin:molecule@molecule-channel"
|
||||
if strings.Contains(externalChannelTemplate, bannedFormBroken) {
|
||||
t.Errorf("externalChannelTemplate contains the broken two-flag launch form. " +
|
||||
"Use --dangerously-load-development-channels plugin:molecule@molecule-channel (spec as value, not a separate --channels flag).")
|
||||
}
|
||||
|
||||
// The single-flag form must be present.
|
||||
requiredFormGood := "--dangerously-load-development-channels plugin:molecule@molecule-channel"
|
||||
if !strings.Contains(externalChannelTemplate, requiredFormGood) {
|
||||
t.Errorf("externalChannelTemplate must contain %q so operators see the working launch invocation", requiredFormGood)
|
||||
}
|
||||
}
|
||||
|
||||
// TestExternalChannelTemplate_CanonicalEnvShape pins the canvas-served
|
||||
// .env example to the canonical SSOT shape (MOLECULE_WORKSPACES_JSON)
|
||||
// rather than the legacy single-platform shape. The legacy form
|
||||
// (MOLECULE_PLATFORM_URL + comma-separated IDs/TOKENS) is still accepted
|
||||
// by the channel plugin's parseWorkspaceTargets but is single-tenant
|
||||
// only — it silently fails to onboard users who want to watch multiple
|
||||
// platforms (e.g. hongming + agents-team from the same plugin instance),
|
||||
// which is the post-PR#15 expected use case.
|
||||
func TestExternalChannelTemplate_CanonicalEnvShape(t *testing.T) {
|
||||
if !strings.Contains(externalChannelTemplate, "MOLECULE_WORKSPACES_JSON=") {
|
||||
t.Errorf("externalChannelTemplate must use MOLECULE_WORKSPACES_JSON as the canonical .env shape (the post-PR#15 SSOT)")
|
||||
}
|
||||
// The JSON example must contain the workspace_id + platform_url placeholders
|
||||
// so the canvas substitutes them at serve time.
|
||||
for _, ph := range []string{"{{WORKSPACE_ID}}", "{{PLATFORM_URL}}"} {
|
||||
if !strings.Contains(externalChannelTemplate, ph) {
|
||||
t.Errorf("externalChannelTemplate must contain placeholder %q so the canvas substitutes per-workspace values", ph)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPollingTemplates_OptIntoPeerInfo pins the invariant that any template
|
||||
// which calls /workspaces/:id/activity for inbound delivery requests the
|
||||
// Layer 1 enrichment via ?include=peer_info. Without this opt-in, the
|
||||
// platform returns bare activity rows and the operator's bridge / channel
|
||||
// loses peer_name / peer_role / agent_card_url / attachments[] — they're
|
||||
// available on the server but not delivered.
|
||||
//
|
||||
// Pre-Layer-1 platforms ignore unknown query params (HTTP spec: filters
|
||||
// not understood are dropped), so this is back-compat across deploys.
|
||||
//
|
||||
// The Claude Code channel template doesn't include the poll URL in this
|
||||
// snippet — its polling lives in the plugin's own server.ts (handled by
|
||||
// molecule-mcp-claude-channel PR#21). The Kimi template DOES include a
|
||||
// poll loop in its kimi_bridge.py block, so the invariant applies there.
|
||||
func TestPollingTemplates_OptIntoPeerInfo(t *testing.T) {
|
||||
pollingTemplates := map[string]string{
|
||||
"externalKimiTemplate": externalKimiTemplate,
|
||||
}
|
||||
for name, body := range pollingTemplates {
|
||||
// If the snippet polls /activity, it must opt into peer_info.
|
||||
// The detection is intentionally loose ("/activity" appears in
|
||||
// the script) — operators who customize the script keep the
|
||||
// invariant only if the include hint is in the template.
|
||||
if !strings.Contains(body, "/activity") {
|
||||
t.Errorf("%s no longer polls /activity — review whether this test still applies", name)
|
||||
continue
|
||||
}
|
||||
if !strings.Contains(body, `"include": "peer_info"`) && !strings.Contains(body, "include=peer_info") {
|
||||
t.Errorf("%s polls /activity without ?include=peer_info — operators lose Layer 1 enrichment "+
|
||||
"(peer_name / peer_role / agent_card_url / attachments[]). Add the param to the poll URL.", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,7 +159,8 @@ func generateAppInstallationToken() (string, time.Time, error) {
|
||||
req, _ := http.NewRequest("POST", fmt.Sprintf("https://api.github.com/app/installations/%d/access_tokens", installID), nil)
|
||||
req.Header.Set("Authorization", "Bearer "+signed)
|
||||
req.Header.Set("Accept", "application/vnd.github+json")
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", time.Time{}, err
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) {
|
||||
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -69,7 +69,7 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) {
|
||||
mock.ExpectBegin()
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -291,7 +291,7 @@ func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
|
||||
@@ -368,7 +368,7 @@ func TestWorkspaceCreate(t *testing.T) {
|
||||
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect transaction commit (no secrets in this payload)
|
||||
|
||||
@@ -84,6 +84,7 @@ type mcpTool struct {
|
||||
type MCPHandler struct {
|
||||
database *sql.DB
|
||||
broadcaster *events.Broadcaster
|
||||
a2aProxy func(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error)
|
||||
|
||||
// memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe:
|
||||
// every v2 tool calls memoryV2Available() first and returns a
|
||||
@@ -98,6 +99,14 @@ func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandle
|
||||
return &MCPHandler{database: database, broadcaster: broadcaster}
|
||||
}
|
||||
|
||||
func (h *MCPHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
|
||||
if h.a2aProxy != nil {
|
||||
return h.a2aProxy(ctx, workspaceID, body, callerID, logActivity)
|
||||
}
|
||||
wh := NewWorkspaceHandler(h.broadcaster, nil, "", "")
|
||||
return wh.ProxyA2ARequest(ctx, workspaceID, body, callerID, logActivity)
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Tool definitions (mirrors workspace/a2a_mcp_server.py TOOLS list)
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -53,6 +53,15 @@ func mcpPost(t *testing.T, h *MCPHandler, workspaceID string, body interface{})
|
||||
return w
|
||||
}
|
||||
|
||||
func expectCanCommunicateSiblings(mock sqlmock.Sqlmock, callerID, targetID, parentID string) {
|
||||
mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(callerID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(callerID, parentID))
|
||||
mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(targetID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(targetID, parentID))
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// initialize
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -178,6 +187,98 @@ func TestMCPHandler_ToolsList_ContainsExpectedTools(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMCPHandler_DelegateTask_RoutesThroughPlatformA2AProxy(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
var gotTarget, gotCaller string
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
|
||||
gotTarget = workspaceID
|
||||
gotCaller = callerID
|
||||
if !logActivity {
|
||||
t.Fatal("delegate_task should log through platform A2A proxy")
|
||||
}
|
||||
if !strings.Contains(string(body), "do work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "do work",
|
||||
}, mcpCallTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task returned error: %v", err)
|
||||
}
|
||||
if out != "done" {
|
||||
t.Fatalf("delegate_task response = %q, want done", out)
|
||||
}
|
||||
if gotTarget != targetID || gotCaller != callerID {
|
||||
t.Fatalf("proxy called with target=%q caller=%q, want target=%q caller=%q", gotTarget, gotCaller, targetID, callerID)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
called := make(chan struct{}, 1)
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
if !strings.Contains(string(body), "async work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
called <- struct{}{}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "async work",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
waitGlobalAsyncForTest()
|
||||
select {
|
||||
case <-called:
|
||||
default:
|
||||
t.Fatal("async delegate did not call platform A2A proxy")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// notifications/initialized
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -7,24 +7,19 @@ package handlers
|
||||
// and A2A response parsing helpers.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// insertMCPDelegationRow writes a delegation activity row so the canvas
|
||||
// Agent Comms tab can show the task text for MCP-initiated delegations.
|
||||
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
|
||||
@@ -190,15 +185,6 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
// Non-fatal: still make the A2A call even if activity log write fails.
|
||||
}
|
||||
|
||||
agentURL, err := mcpResolveURL(ctx, h.database, targetID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// SSRF defence: reject private/metadata URLs before making outbound call.
|
||||
if err := isSafeURL(agentURL); err != nil {
|
||||
return "", fmt.Errorf("invalid workspace URL: %w", err)
|
||||
}
|
||||
|
||||
a2aBody, err := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": uuid.New().String(),
|
||||
@@ -218,36 +204,17 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(reqCtx, "POST", agentURL+"/a2a", bytes.NewReader(a2aBody))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
// X-Workspace-ID identifies this caller to the A2A proxy. The /workspaces/:id/a2a
|
||||
// endpoint is intentionally outside WorkspaceAuth (agents do not hold bearer tokens
|
||||
// to peer workspaces). Access control is enforced by CanCommunicate above, which
|
||||
// already validated callerID → targetID before this request is constructed.
|
||||
// callerID was authenticated by WorkspaceAuth on the MCP bridge entry point,
|
||||
// so this header reflects a verified caller identity, not a spoofable value.
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
status, body, err := h.proxyA2ARequest(reqCtx, targetID, a2aBody, callerID, true)
|
||||
if err != nil {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", err.Error())
|
||||
return "", fmt.Errorf("A2A call failed: %w", err)
|
||||
return "", fmt.Errorf("A2A proxy failed: %w", err)
|
||||
}
|
||||
if status < 200 || status >= 300 {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", fmt.Sprintf("A2A proxy returned status %d", status))
|
||||
return "", fmt.Errorf("A2A proxy returned status %d", status)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// A 200/500 from the peer still means the call was dispatched — only
|
||||
// network errors are truly "failed". Status 'dispatched' is correct for
|
||||
// any HTTP response (peer's A2A layer handles the actual processing).
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "dispatched", "")
|
||||
|
||||
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read response: %w", err)
|
||||
}
|
||||
|
||||
return extractA2AText(body), nil
|
||||
}
|
||||
|
||||
@@ -278,24 +245,13 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
|
||||
// Fire and forget in a detached goroutine. Use a background context so
|
||||
// the call is not cancelled when the HTTP request completes.
|
||||
// RFC internal#524 Layer 1: globalGoAsync — the detached call reads
|
||||
// db.DB (mcpResolveURL + updateMCPDelegationStatus) and must be
|
||||
// drained by drainTestAsync before any t.Cleanup-driven db.DB swap.
|
||||
// RFC internal#524 Layer 1: globalGoAsync — the detached call reads db.DB
|
||||
// through the platform A2A proxy and must be drained by drainTestAsync
|
||||
// before any t.Cleanup-driven db.DB swap.
|
||||
globalGoAsync(func() {
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
agentURL, err := mcpResolveURL(bgCtx, h.database, targetID)
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: resolve URL for %s: %v", targetID, err)
|
||||
return
|
||||
}
|
||||
// SSRF defence: reject private/metadata URLs before making outbound call.
|
||||
if err := isSafeURL(agentURL); err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: unsafe URL for %s: %v", targetID, err)
|
||||
return
|
||||
}
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": delegationID,
|
||||
@@ -309,22 +265,15 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
},
|
||||
})
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(bgCtx, "POST", agentURL+"/a2a", bytes.NewReader(a2aBody))
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: create request: %v", err)
|
||||
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
|
||||
if err != nil || status < 200 || status >= 300 {
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s: %v", targetID, err)
|
||||
} else {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s returned status %d", targetID, status)
|
||||
}
|
||||
return
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A call to %s: %v", targetID, err)
|
||||
return
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
// Drain response so the connection can be reused.
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
})
|
||||
|
||||
return fmt.Sprintf(`{"task_id":%q,"status":"dispatched","target_id":%q}`, delegationID, targetID), nil
|
||||
@@ -405,7 +354,6 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
||||
return "Message sent.", nil
|
||||
}
|
||||
|
||||
|
||||
func (h *MCPHandler) toolCommitMemory(ctx context.Context, workspaceID string, args map[string]interface{}) (string, error) {
|
||||
// PR-6 (RFC #2728) compat shim: when the v2 plugin is wired
|
||||
// (MEMORY_PLUGIN_URL set), translate legacy scope→namespace and
|
||||
@@ -534,56 +482,6 @@ func (h *MCPHandler) toolRecallMemory(ctx context.Context, workspaceID string, a
|
||||
// Helpers
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// mcpResolveURL returns a routable URL for a workspace's A2A server.
|
||||
//
|
||||
// Resolution order:
|
||||
// 1. Docker-internal URL cache (set by provisioner; correct when platform is in Docker)
|
||||
// 2. Redis URL cache
|
||||
// 3. DB `url` column fallback, with 127.0.0.1→Docker bridge rewrite when in Docker
|
||||
//
|
||||
// SECURITY (F1083 / #1130): all three paths run the returned URL through
|
||||
// validateAgentURL to block SSRF targets (private IPs, loopback, cloud metadata).
|
||||
func mcpResolveURL(ctx context.Context, database *sql.DB, workspaceID string) (string, error) {
|
||||
if platformInDocker {
|
||||
if url, err := db.GetCachedInternalURL(ctx, workspaceID); err == nil && url != "" {
|
||||
if err := validateAgentURL(url); err != nil {
|
||||
return "", fmt.Errorf("workspace %s: forbidden URL from internal cache: %w", workspaceID, err)
|
||||
}
|
||||
return url, nil
|
||||
}
|
||||
}
|
||||
if url, err := db.GetCachedURL(ctx, workspaceID); err == nil && url != "" {
|
||||
if platformInDocker && strings.HasPrefix(url, "http://127.0.0.1:") {
|
||||
return provisioner.InternalURL(workspaceID), nil
|
||||
}
|
||||
if err := validateAgentURL(url); err != nil {
|
||||
return "", fmt.Errorf("workspace %s: forbidden URL from Redis cache: %w", workspaceID, err)
|
||||
}
|
||||
return url, nil
|
||||
}
|
||||
|
||||
var urlStr sql.NullString
|
||||
var status string
|
||||
if err := database.QueryRowContext(ctx,
|
||||
`SELECT url, status FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&urlStr, &status); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "", fmt.Errorf("workspace %s not found", workspaceID)
|
||||
}
|
||||
return "", fmt.Errorf("workspace lookup failed: %w", err)
|
||||
}
|
||||
if !urlStr.Valid || urlStr.String == "" {
|
||||
return "", fmt.Errorf("workspace %s has no URL (status: %s)", workspaceID, status)
|
||||
}
|
||||
if platformInDocker && strings.HasPrefix(urlStr.String, "http://127.0.0.1:") {
|
||||
return provisioner.InternalURL(workspaceID), nil
|
||||
}
|
||||
if err := validateAgentURL(urlStr.String); err != nil {
|
||||
return "", fmt.Errorf("workspace %s: forbidden URL from DB: %w", workspaceID, err)
|
||||
}
|
||||
return urlStr.String, nil
|
||||
}
|
||||
|
||||
// extractA2AText extracts human-readable text from an A2A JSON-RPC response body.
|
||||
// Falls back to the raw JSON when no text part can be found.
|
||||
func extractA2AText(body []byte) string {
|
||||
@@ -632,4 +530,3 @@ func extractA2AText(body []byte) string {
|
||||
b, _ := json.Marshal(result)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) {
|
||||
// Go's net.ParseIP.To4() before Contains() runs, so the IPv4 rules above
|
||||
// catch those without a separate entry.
|
||||
//
|
||||
// F1083/#1130 (SSRF on mcpResolveURL / a2a_proxy resolveAgentURL): in
|
||||
// F1083/#1130 (SSRF on direct A2A URL resolution): in
|
||||
// addition to blocking IP literals, DNS names are now resolved and each
|
||||
// returned IP is checked against the blocklist. This closes the gap where
|
||||
// an attacker could register agent.example.com pointing to 169.254.169.254.
|
||||
|
||||
@@ -214,6 +214,11 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace fields"})
|
||||
return
|
||||
}
|
||||
// #1686 Phase 1: validate per-workspace compute overrides.
|
||||
if err := models.ValidateComputeConfig(payload.Compute); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
awarenessNamespace := workspaceAwarenessNamespace(id)
|
||||
@@ -398,11 +403,22 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// double-click. Helper retries with " (2)", " (3)", … up to maxNameSuffix,
|
||||
// returns the actually-persisted name (which we MUST thread back into
|
||||
// payload + broadcast so the canvas displays what the DB has).
|
||||
var computeInstanceType *string
|
||||
var computeVolumeRootGB *int
|
||||
if payload.Compute != nil {
|
||||
if payload.Compute.InstanceType != "" {
|
||||
computeInstanceType = &payload.Compute.InstanceType
|
||||
}
|
||||
if payload.Compute.Volume.RootGB != 0 {
|
||||
computeVolumeRootGB = &payload.Compute.Volume.RootGB
|
||||
}
|
||||
}
|
||||
|
||||
const insertWorkspaceSQL = `
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12)
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode, compute_instance_type, compute_volume_root_gb)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12, $13, $14)
|
||||
`
|
||||
insertArgs := []any{id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode}
|
||||
insertArgs := []any{id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode, computeInstanceType, computeVolumeRootGB}
|
||||
persistedName, currentTx, err := insertWorkspaceWithNameRetry(
|
||||
ctx,
|
||||
tx,
|
||||
|
||||
@@ -157,6 +157,8 @@ func TestWorkspaceBudget_Create_WithLimit(t *testing.T) {
|
||||
&budgetVal, // budget_limit ($10)
|
||||
models.DefaultMaxConcurrentTasks, // max_concurrent_tasks default
|
||||
"push", // delivery_mode default (#2339)
|
||||
(*string)(nil), // compute_instance_type default
|
||||
(*int)(nil), // compute_volume_root_gb default
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
@@ -309,9 +309,31 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
|
||||
// RuntimeImages[Runtime] :latest lookup, which is what the dead
|
||||
// reader's sql.ErrNoRows path was producing already.
|
||||
Image: "",
|
||||
// Compute overrides (nullable — omitted = platform-managed default).
|
||||
// Issue #1686 Phase 1.
|
||||
InstanceType: extractComputeInstanceType(payload.Compute),
|
||||
VolumeRootGB: extractComputeVolumeRootGB(payload.Compute),
|
||||
}
|
||||
}
|
||||
|
||||
// extractComputeInstanceType returns the instance type from a ComputeConfig,
|
||||
// or nil when cfg is nil or the field is empty.
|
||||
func extractComputeInstanceType(cfg *models.ComputeConfig) *string {
|
||||
if cfg != nil && cfg.InstanceType != "" {
|
||||
return &cfg.InstanceType
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractComputeVolumeRootGB returns the root volume size from a ComputeConfig,
|
||||
// or nil when cfg is nil or the field is zero.
|
||||
func extractComputeVolumeRootGB(cfg *models.ComputeConfig) *int {
|
||||
if cfg != nil && cfg.Volume.RootGB != 0 {
|
||||
return &cfg.Volume.RootGB
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// issueAndInjectToken rotates the workspace auth token and injects the
|
||||
// plaintext into cfg.ConfigFiles[".auth_token"] so it is written into the
|
||||
// /configs volume by WriteFilesToContainer immediately after the container
|
||||
|
||||
@@ -779,6 +779,75 @@ func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildProvisionerConfig_ComputeOverrides verifies that #1686 Phase 1
|
||||
// compute fields (instance_type + volume.root_gb) are threaded from the
|
||||
// create payload into the provisioner config.
|
||||
func TestBuildProvisionerConfig_ComputeOverrides(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
|
||||
WithArgs("ws-compute").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"workspace_dir", "workspace_access"}).AddRow("", "none"))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
cfg := handler.buildProvisionerConfig(
|
||||
context.Background(),
|
||||
"ws-compute",
|
||||
"",
|
||||
nil,
|
||||
models.CreateWorkspacePayload{
|
||||
Tier: 2,
|
||||
Runtime: "python",
|
||||
Compute: &models.ComputeConfig{
|
||||
InstanceType: "g4dn.xlarge",
|
||||
Volume: models.ComputeVolume{RootGB: 256},
|
||||
},
|
||||
},
|
||||
nil,
|
||||
"",
|
||||
"workspace:ws-compute",
|
||||
)
|
||||
|
||||
if cfg.InstanceType == nil || *cfg.InstanceType != "g4dn.xlarge" {
|
||||
t.Errorf("InstanceType = %v, want g4dn.xlarge", cfg.InstanceType)
|
||||
}
|
||||
if cfg.VolumeRootGB == nil || *cfg.VolumeRootGB != 256 {
|
||||
t.Errorf("VolumeRootGB = %v, want 256", cfg.VolumeRootGB)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildProvisionerConfig_ComputeNil verifies backward compat: when the
|
||||
// payload omits compute, the provisioner config fields are nil so the CP
|
||||
// applies its own defaults.
|
||||
func TestBuildProvisionerConfig_ComputeNil(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
|
||||
WithArgs("ws-no-compute").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"workspace_dir", "workspace_access"}).AddRow("", "none"))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
cfg := handler.buildProvisionerConfig(
|
||||
context.Background(),
|
||||
"ws-no-compute",
|
||||
"",
|
||||
nil,
|
||||
models.CreateWorkspacePayload{Tier: 1, Runtime: "python"},
|
||||
nil,
|
||||
"",
|
||||
"workspace:ws-no-compute",
|
||||
)
|
||||
|
||||
if cfg.InstanceType != nil {
|
||||
t.Errorf("InstanceType = %v, want nil", cfg.InstanceType)
|
||||
}
|
||||
if cfg.VolumeRootGB != nil {
|
||||
t.Errorf("VolumeRootGB = %v, want nil", cfg.VolumeRootGB)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== issueAndInjectToken (issue #418) ====================
|
||||
|
||||
// TestIssueAndInjectToken_HappyPath verifies that on a normal (re)provision the
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -342,7 +343,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) {
|
||||
// Transaction begins, workspace INSERT fails, transaction is rolled back.
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
mock.ExpectRollback()
|
||||
|
||||
@@ -364,6 +365,94 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_InvalidCompute verifies #1686 Phase 1 create-time
|
||||
// validation: bad instance_type or volume.root_gb returns 400 before any
|
||||
// DB call.
|
||||
func TestWorkspaceCreate_InvalidCompute(t *testing.T) {
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
body string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "instance_type too long",
|
||||
body: `{"name":"Bad Type","compute":{"instance_type":"` + strings.Repeat("x", 65) + `"}}`,
|
||||
want: "compute.instance_type too long",
|
||||
},
|
||||
{
|
||||
name: "root_gb too small",
|
||||
body: `{"name":"Small Disk","compute":{"volume":{"root_gb":16}}}`,
|
||||
want: "compute.volume.root_gb must be at least 32",
|
||||
},
|
||||
{
|
||||
name: "root_gb too large",
|
||||
body: `{"name":"Big Disk","compute":{"volume":{"root_gb":4096}}}`,
|
||||
want: "compute.volume.root_gb exceeds maximum 2048",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(tc.body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), tc.want) {
|
||||
t.Errorf("body %q should contain %q", w.Body.String(), tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_WithComputeOverrides verifies that valid #1686 Phase 1
|
||||
// compute fields are persisted into the workspaces table.
|
||||
func TestWorkspaceCreate_WithComputeOverrides(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectBegin()
|
||||
instanceType := "g4dn.xlarge"
|
||||
rootGB := 256
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "GPU Agent", nil, 3, "python", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", &instanceType, &rootGB).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status =`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO workspace_config").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"name":"GPU Agent","runtime":"python","compute":{"instance_type":"g4dn.xlarge","volume":{"root_gb":256}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Errorf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
@@ -375,7 +464,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
|
||||
// Expect workspace INSERT with defaulted tier=3 (Privileged — the
|
||||
// handler default in workspace.go), runtime="langgraph"
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
@@ -423,7 +512,7 @@ func TestWorkspaceCreate_SaaSHardForcesTier4(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "SaaS External Agent", nil, 4, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "SaaS External Agent", nil, 4, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -464,7 +553,7 @@ func TestWorkspaceCreate_WithSecrets_Persists(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Secret inserted inside the same transaction.
|
||||
mock.ExpectExec("INSERT INTO workspace_secrets").
|
||||
@@ -576,7 +665,7 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
// External URL update (localhost is explicitly allowed by validateAgentURL).
|
||||
@@ -615,7 +704,7 @@ func TestWorkspaceCreate_KimiRuntime_PreservesLabel(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
// Pre-register flow: awaiting_agent + runtime preserved as "kimi"
|
||||
@@ -1639,7 +1728,7 @@ runtime_config:
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1696,7 +1785,7 @@ model: anthropic:claude-sonnet-4-5
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Legacy Agent", nil, 3, "langgraph",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1749,7 +1838,7 @@ runtime_config:
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Custom Hermes", nil, 3, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1855,7 +1944,7 @@ func TestWorkspaceCreate_188_NoTemplateNoRuntime_StillDefaultsLanggraph(t *testi
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Plain Default", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Plain Default", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1890,7 +1979,7 @@ func TestWorkspaceCreate_188_ExplicitRuntimeNoTemplate_OK(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Explicit Codex", nil, 3, "codex", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Explicit Codex", nil, 3, "codex", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push", (*string)(nil), (*int)(nil)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
|
||||
@@ -3,6 +3,7 @@ package models
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -45,6 +46,10 @@ type Workspace struct {
|
||||
// forced to route updates through a parent workspace. Default true
|
||||
// (preserves existing behaviour for all workspaces).
|
||||
TalkToUserEnabled bool `json:"talk_to_user_enabled" db:"talk_to_user_enabled"`
|
||||
// Compute overrides (nullable — omitted = platform-managed default).
|
||||
// Issue #1686 Phase 1.
|
||||
ComputeInstanceType *string `json:"compute_instance_type,omitempty" db:"compute_instance_type"`
|
||||
ComputeVolumeRootGB *int `json:"compute_volume_root_gb,omitempty" db:"compute_volume_root_gb"`
|
||||
// Canvas layout fields (from JOIN)
|
||||
X float64 `json:"x"`
|
||||
Y float64 `json:"y"`
|
||||
@@ -154,6 +159,40 @@ type MemorySeed struct {
|
||||
Scope string `json:"scope" yaml:"scope"` // LOCAL, TEAM, GLOBAL
|
||||
}
|
||||
|
||||
// ComputeVolume holds per-workspace disk configuration.
|
||||
type ComputeVolume struct {
|
||||
RootGB int `json:"root_gb"`
|
||||
}
|
||||
|
||||
// ComputeConfig holds per-workspace EC2 compute overrides.
|
||||
// Omitted at create time means "use platform-managed defaults".
|
||||
type ComputeConfig struct {
|
||||
InstanceType string `json:"instance_type"`
|
||||
Volume ComputeVolume `json:"volume"`
|
||||
}
|
||||
|
||||
// ValidateComputeConfig performs create-time validation on compute overrides.
|
||||
// Returns nil when cfg is nil (omitted = platform-managed default).
|
||||
func ValidateComputeConfig(cfg *ComputeConfig) error {
|
||||
if cfg == nil {
|
||||
return nil
|
||||
}
|
||||
if cfg.InstanceType != "" {
|
||||
if len(cfg.InstanceType) > 64 {
|
||||
return fmt.Errorf("compute.instance_type too long (max 64 chars)")
|
||||
}
|
||||
}
|
||||
if cfg.Volume.RootGB != 0 {
|
||||
if cfg.Volume.RootGB < 32 {
|
||||
return fmt.Errorf("compute.volume.root_gb must be at least 32")
|
||||
}
|
||||
if cfg.Volume.RootGB > 2048 {
|
||||
return fmt.Errorf("compute.volume.root_gb exceeds maximum 2048")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type CreateWorkspacePayload struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Role string `json:"role"`
|
||||
@@ -180,6 +219,9 @@ type CreateWorkspacePayload struct {
|
||||
// MaxConcurrentTasks caps parallel A2A + cron dispatch. 0 means use
|
||||
// DefaultMaxConcurrentTasks. Leaders typically set 3.
|
||||
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
|
||||
// Compute is an optional per-workspace EC2 shape override.
|
||||
// Omitted = platform-managed default (current behaviour).
|
||||
Compute *ComputeConfig `json:"compute,omitempty"`
|
||||
Canvas struct {
|
||||
X float64 `json:"x"`
|
||||
Y float64 `json:"y"`
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
package models
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestValidateComputeConfig_NilIsValid(t *testing.T) {
|
||||
if err := ValidateComputeConfig(nil); err != nil {
|
||||
t.Errorf("nil compute config should be valid, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateComputeConfig_EmptyIsValid(t *testing.T) {
|
||||
cfg := &ComputeConfig{}
|
||||
if err := ValidateComputeConfig(cfg); err != nil {
|
||||
t.Errorf("empty compute config should be valid, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateComputeConfig_ValidOverrides(t *testing.T) {
|
||||
cfg := &ComputeConfig{
|
||||
InstanceType: "g4dn.xlarge",
|
||||
Volume: ComputeVolume{RootGB: 256},
|
||||
}
|
||||
if err := ValidateComputeConfig(cfg); err != nil {
|
||||
t.Errorf("valid overrides should pass, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateComputeConfig_InstanceTypeTooLong(t *testing.T) {
|
||||
longName := string(make([]byte, 65))
|
||||
for i := range longName {
|
||||
longName = longName[:i] + "x" + longName[i+1:]
|
||||
}
|
||||
cfg := &ComputeConfig{InstanceType: longName}
|
||||
if err := ValidateComputeConfig(cfg); err == nil {
|
||||
t.Error("expected error for instance_type > 64 chars")
|
||||
} else if err.Error() != "compute.instance_type too long (max 64 chars)" {
|
||||
t.Errorf("unexpected error message: %q", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateComputeConfig_RootGBTooSmall(t *testing.T) {
|
||||
cfg := &ComputeConfig{Volume: ComputeVolume{RootGB: 31}}
|
||||
if err := ValidateComputeConfig(cfg); err == nil {
|
||||
t.Error("expected error for root_gb < 32")
|
||||
} else if err.Error() != "compute.volume.root_gb must be at least 32" {
|
||||
t.Errorf("unexpected error message: %q", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateComputeConfig_RootGBTooLarge(t *testing.T) {
|
||||
cfg := &ComputeConfig{Volume: ComputeVolume{RootGB: 2049}}
|
||||
if err := ValidateComputeConfig(cfg); err == nil {
|
||||
t.Error("expected error for root_gb > 2048")
|
||||
} else if err.Error() != "compute.volume.root_gb exceeds maximum 2048" {
|
||||
t.Errorf("unexpected error message: %q", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateComputeConfig_BoundaryValues(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
cfg ComputeConfig
|
||||
ok bool
|
||||
}{
|
||||
{"min root_gb", ComputeConfig{Volume: ComputeVolume{RootGB: 32}}, true},
|
||||
{"max root_gb", ComputeConfig{Volume: ComputeVolume{RootGB: 2048}}, true},
|
||||
{"just under min", ComputeConfig{Volume: ComputeVolume{RootGB: 31}}, false},
|
||||
{"just over max", ComputeConfig{Volume: ComputeVolume{RootGB: 2049}}, false},
|
||||
{"exactly 64 char type", ComputeConfig{InstanceType: string(make([]byte, 64))}, true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// fill the 64-char case with 'x'
|
||||
if tc.cfg.InstanceType != "" {
|
||||
b := make([]byte, len(tc.cfg.InstanceType))
|
||||
for i := range b {
|
||||
b[i] = 'x'
|
||||
}
|
||||
tc.cfg.InstanceType = string(b)
|
||||
}
|
||||
err := ValidateComputeConfig(&tc.cfg)
|
||||
if tc.ok && err != nil {
|
||||
t.Errorf("expected valid, got: %v", err)
|
||||
}
|
||||
if !tc.ok && err == nil {
|
||||
t.Error("expected invalid, got nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -163,6 +163,10 @@ type cpProvisionRequest struct {
|
||||
// collectCPConfigFiles which rejects symlinks and non-regular files
|
||||
// before including them. Serialised as base64 to avoid JSON escaping.
|
||||
ConfigFiles map[string]string `json:"config_files,omitempty"`
|
||||
// Compute overrides (nullable — omitted = platform-managed default).
|
||||
// Issue #1686 Phase 1.
|
||||
InstanceType *string `json:"instance_type,omitempty"`
|
||||
VolumeRootGB *int `json:"volume_root_gb,omitempty"`
|
||||
}
|
||||
|
||||
type cpProvisionResponse struct {
|
||||
@@ -206,13 +210,15 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
|
||||
}
|
||||
|
||||
req := cpProvisionRequest{
|
||||
OrgID: p.orgID,
|
||||
WorkspaceID: cfg.WorkspaceID,
|
||||
Runtime: cfg.Runtime,
|
||||
Tier: cfg.Tier,
|
||||
PlatformURL: cfg.PlatformURL,
|
||||
Env: env,
|
||||
ConfigFiles: configFiles,
|
||||
OrgID: p.orgID,
|
||||
WorkspaceID: cfg.WorkspaceID,
|
||||
Runtime: cfg.Runtime,
|
||||
Tier: cfg.Tier,
|
||||
PlatformURL: cfg.PlatformURL,
|
||||
Env: env,
|
||||
ConfigFiles: configFiles,
|
||||
InstanceType: cfg.InstanceType,
|
||||
VolumeRootGB: cfg.VolumeRootGB,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(req)
|
||||
|
||||
@@ -1062,3 +1062,75 @@ func TestCollectCPConfigFiles_RejectsRootSymlink(t *testing.T) {
|
||||
t.Errorf("expected symlink-related error, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStart_ComputeOverrides — when WorkspaceConfig carries InstanceType and
|
||||
// VolumeRootGB, they must be forwarded in the cpProvisionRequest body so the
|
||||
// CP can pass them to EC2 RunInstances. Regression guard for #1686 Phase 1.
|
||||
func TestStart_ComputeOverrides(t *testing.T) {
|
||||
var gotBody cpProvisionRequest
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := json.NewDecoder(r.Body).Decode(&gotBody); err != nil {
|
||||
t.Errorf("decode request: %v", err)
|
||||
}
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
_, _ = io.WriteString(w, `{"instance_id":"i-compute","state":"pending"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
instanceType := "g4dn.xlarge"
|
||||
volumeRootGB := 256
|
||||
_, err := p.Start(context.Background(), WorkspaceConfig{
|
||||
WorkspaceID: "ws-1",
|
||||
Runtime: "python",
|
||||
Tier: 2,
|
||||
PlatformURL: "http://tenant",
|
||||
InstanceType: &instanceType,
|
||||
VolumeRootGB: &volumeRootGB,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Start: %v", err)
|
||||
}
|
||||
if gotBody.InstanceType == nil || *gotBody.InstanceType != "g4dn.xlarge" {
|
||||
t.Errorf("instance_type = %v, want g4dn.xlarge", gotBody.InstanceType)
|
||||
}
|
||||
if gotBody.VolumeRootGB == nil || *gotBody.VolumeRootGB != 256 {
|
||||
t.Errorf("volume_root_gb = %v, want 256", gotBody.VolumeRootGB)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStart_ComputeOmittedWhenNil — when WorkspaceConfig has no compute
|
||||
// overrides, the JSON body must omit the keys entirely (omitempty) so CP
|
||||
// applies its own defaults rather than empty/zero values.
|
||||
func TestStart_ComputeOmittedWhenNil(t *testing.T) {
|
||||
var raw json.RawMessage
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
|
||||
t.Errorf("decode request: %v", err)
|
||||
}
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
_, _ = io.WriteString(w, `{"instance_id":"i-default","state":"pending"}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
|
||||
_, err := p.Start(context.Background(), WorkspaceConfig{
|
||||
WorkspaceID: "ws-1",
|
||||
Runtime: "python",
|
||||
Tier: 1,
|
||||
PlatformURL: "http://tenant",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Start: %v", err)
|
||||
}
|
||||
var decoded map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &decoded); err != nil {
|
||||
t.Fatalf("unmarshal raw body: %v", err)
|
||||
}
|
||||
if _, ok := decoded["instance_type"]; ok {
|
||||
t.Errorf("instance_type should be omitted when nil")
|
||||
}
|
||||
if _, ok := decoded["volume_root_gb"]; ok {
|
||||
t.Errorf("volume_root_gb should be omitted when nil")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,6 +105,11 @@ type WorkspaceConfig struct {
|
||||
WorkspaceAccess string // #65: "none" (default), "read_only", or "read_write"
|
||||
ResetClaudeSession bool // #12: if true, discard the claude-sessions volume before start (fresh session dir)
|
||||
|
||||
// Compute overrides (nullable — omitted = platform-managed default).
|
||||
// Issue #1686 Phase 1.
|
||||
InstanceType *string `json:"instance_type,omitempty"`
|
||||
VolumeRootGB *int `json:"volume_root_gb,omitempty"`
|
||||
|
||||
// Image, when non-empty, overrides the runtime→image lookup. CP
|
||||
// (molecule-controlplane) is the single SSOT for runtime image digest
|
||||
// pins via its migrations/027_runtime_image_pins table — the pin is
|
||||
@@ -726,6 +731,16 @@ func buildContainerEnv(cfg WorkspaceConfig) []string {
|
||||
}
|
||||
env = append(env, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
// #1687: alias GH_PAT → GH_TOKEN / GITHUB_TOKEN on the READ side
|
||||
// (container env assembly). gh CLI and git credential helpers look
|
||||
// for these standard names; by aliasing here we avoid writing the
|
||||
// forbidden keys into tenant-writer surfaces (workspace_secrets,
|
||||
// envVars map, etc.). GH_PAT itself is not an SCM-write credential
|
||||
// and passes through cfg.EnvVars untouched.
|
||||
if pat, hasPAT := cfg.EnvVars["GH_PAT"]; hasPAT && pat != "" {
|
||||
env = append(env, fmt.Sprintf("GH_TOKEN=%s", pat))
|
||||
env = append(env, fmt.Sprintf("GITHUB_TOKEN=%s", pat))
|
||||
}
|
||||
// Inject ADMIN_TOKEN from the platform server's environment so workspace
|
||||
// containers can call /admin/liveness and other admin-gated endpoints
|
||||
// (core#831). cp_provisioner.go handles this separately for SaaS tenants.
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
# T4 privilege contract — generated from
|
||||
# molecule-ai/molecule-core workspace-server/internal/provisioner/t4_privilege_contract.go
|
||||
# RFC: molecule-ai/internal#456
|
||||
# Do NOT edit this file by hand; regenerate via `go run ./cmd/t4-contract-dump > t4_capabilities.yaml`.
|
||||
version: 1
|
||||
agent_uid: 1000
|
||||
capabilities:
|
||||
- name: "agent_home_writable"
|
||||
description: "/agent-home is writable by the agent (Files API split per task #128). The Files API redesign uses /agent-home as the user-writable root; the agent must be able to create files there without sudo."
|
||||
severity: hard
|
||||
source: "task #128 Files API redesign; memory reference_post_suspension_pipeline"
|
||||
probe: "TF=/agent-home/.t4-cap-write-probe-${MOLECULE_T4_PROBE_ID:-$$}; echo ok > \"$TF\" && [ \"$(cat \"$TF\")\" = \"ok\" ] && rm -f \"$TF\""
|
||||
- name: "agent_uid_1000"
|
||||
description: "The container's primary process (the runtime, post-gosu) runs as uid 1000, not root. T4 grants full machine access via privileged + host PID + Docker socket — the WORKLOAD inside that privileged container must still be unprivileged to prevent every untrusted code execution from being trivially root-on-host."
|
||||
severity: hard
|
||||
source: "RFC internal#456 §2.1.2; memory feedback_hermes_listpeers_401_token_root600_unreadable_by_uid1000"
|
||||
probe: "[ \"$(id -u)\" = \"1000\" ]"
|
||||
- name: "auth_token_agent_owned"
|
||||
description: "/configs/.auth_token is owned by uid 1000 (== AgentUID) so the a2a_mcp_server can read its bearer. In SaaS mode molecule-runtime itself writes the token via save_token() — the ownership equals the runtime's exec uid. If the runtime ever runs as root, this fails and list_peers 401s (the Hermes class bug)."
|
||||
severity: hard
|
||||
source: "RFC internal#456 §10; memory feedback_hermes_listpeers_401_token_root600_unreadable_by_uid1000"
|
||||
probe: "[ -e /configs/.auth_token ] && [ \"$(stat -c '%u' /configs/.auth_token)\" = \"1000\" ]"
|
||||
- name: "docker_socket_reachable"
|
||||
description: "/var/run/docker.sock is bind-mounted and host Docker is reachable from the T4 container. The probe enters the host mount+PID namespaces before running docker info so it validates the same host-control path production agents use, instead of depending on the template image's Docker CLI/socket group details."
|
||||
severity: hard
|
||||
source: "provisioner.go applyHostConfig T4 branch (case 4)"
|
||||
probe: "sudo -n nsenter --target 1 --mount --pid -- docker info >/dev/null 2>&1"
|
||||
- name: "host_fs_write_readback"
|
||||
description: "Host filesystem is mounted at /host and the agent can write+read+remove a file there via sudo. Proves real host reach (not just a PID-1 namespace trick on an isolated init)."
|
||||
severity: hard
|
||||
source: "RFC internal#456 §11"
|
||||
probe: "MARKER=\"t4cap-$(date +%s)-$RANDOM\"; PROBE_FILE=\"/host/tmp/.t4-cap-probe-${MOLECULE_T4_PROBE_ID:-$$}\"; sudo -n sh -c \"echo $MARKER > $PROBE_FILE\" && [ \"$(sudo -n cat $PROBE_FILE)\" = \"$MARKER\" ] && sudo -n rm -f $PROBE_FILE"
|
||||
- name: "host_root_reach_via_nsenter"
|
||||
description: "The uid-1000 agent can attain host root via `sudo -n nsenter --target 1 --mount --pid -- id -u` returning 0. This is the T4 escalation leg: full machine access means the agent CAN escalate to host root deliberately, even though it does not run as root by default."
|
||||
severity: hard
|
||||
source: "RFC internal#456 §11; memory reference_per_template_privilege_contract_class_audit_2026_05_16"
|
||||
probe: "[ \"$(sudo -n nsenter --target 1 --mount --pid -- id -u)\" = \"0\" ]"
|
||||
- name: "list_peers_http_200"
|
||||
description: "The platform list_peers HTTP endpoint (served by the in-container a2a_mcp_server) returns HTTP 200 when called from uid 1000 with the bearer from /configs/.auth_token. This proves the WHOLE token-ownership chain end-to-end: token written under correct uid → reader uid matches → bearer non-empty → platform accepts. A self-contained empirical test for the Hermes class bug."
|
||||
severity: hard
|
||||
source: "memory reference_openclaw_fresh_provision_nonfunctional_anthropic_default_unroutable; memory reference_openclaw_mcp_peer_wiring_rootcause"
|
||||
probe: "BEARER=$(cat /configs/.auth_token 2>/dev/null || echo \"\"); [ -n \"$BEARER\" ] || exit 1; PORT=$(cat /configs/.platform_port 2>/dev/null || echo \"8080\"); STATUS=$(curl -sS -o /dev/null -w '%{http_code}' -H \"Authorization: Bearer $BEARER\" \"http://127.0.0.1:${PORT}/list_peers\"); [ \"$STATUS\" = \"200\" ]"
|
||||
- name: "network_egress_https"
|
||||
description: "Generic HTTPS egress works. T4 is unconstrained network; the canonical test target is the Molecule-owned Gitea middleman over its public name. CI must not depend on GitHub or other mirrors for this probe. Any reachable HTTPS endpoint satisfies it — the YAML carries the recommended targets but accepts any 200/301/302."
|
||||
severity: hard
|
||||
source: "task #174 brief"
|
||||
probe: "for U in $MOLECULE_T4_EGRESS_TARGETS; do C=$(curl -sS -o /dev/null -w '%{http_code}' --max-time 8 \"$U\"); case \"$C\" in 2*|3*) exit 0;; esac; done; exit 1"
|
||||
required_egress:
|
||||
- "https://git.moleculesai.app/api/v1/version"
|
||||
- name: "pid_host_visible"
|
||||
description: "Host PID namespace is shared (--pid=host). The container can see host process 1 (systemd or pid-1 on the EC2 instance). Required for nsenter into host mount/pid namespaces."
|
||||
severity: hard
|
||||
source: "provisioner.go applyHostConfig T4 branch (case 4): hostCfg.PidMode = 'host'"
|
||||
probe: "[ \"$(sudo -n nsenter --target 1 --mount --pid -- id -u)\" = \"0\" ]"
|
||||
- name: "privileged_flag_observable"
|
||||
description: "Container is started with --privileged. Observable from inside via /proc/self/status CapEff containing CAP_SYS_ADMIN. Defense-in-depth for the provisioner emission side."
|
||||
severity: advisory
|
||||
source: "provisioner.go applyHostConfig T4 branch (case 4)"
|
||||
probe: "grep -q '^CapEff:.*ffffffffff' /proc/self/status"
|
||||
@@ -120,8 +120,8 @@ func T4PrivilegeContract() []T4Capability {
|
||||
},
|
||||
{
|
||||
Name: "docker_socket_reachable",
|
||||
Description: "/var/run/docker.sock is bind-mounted into the container so the agent can manage other containers (T4 use case: agent-as-orchestrator). Proven by 'docker version' returning a server section, which requires the daemon to answer over the socket.",
|
||||
Probe: `sudo -n docker version --format '{{.Server.Version}}' >/dev/null 2>&1`,
|
||||
Description: "/var/run/docker.sock is bind-mounted and host Docker is reachable from the T4 container. The probe enters the host mount+PID namespaces before running docker info so it validates the same host-control path production agents use, instead of depending on the template image's Docker CLI/socket group details.",
|
||||
Probe: `sudo -n nsenter --target 1 --mount --pid -- docker info >/dev/null 2>&1`,
|
||||
Severity: SeverityHard,
|
||||
Source: "provisioner.go applyHostConfig T4 branch (case 4)",
|
||||
},
|
||||
@@ -145,7 +145,7 @@ func T4PrivilegeContract() []T4Capability {
|
||||
},
|
||||
{
|
||||
Name: "network_egress_https",
|
||||
Description: "Generic HTTPS egress works. T4 is unconstrained network; the canonical test target is the Gitea instance over its public name, which any fork user can also resolve. Any reachable HTTPS endpoint satisfies it — the YAML carries the recommended targets but accepts any 200/301/302.",
|
||||
Description: "Generic HTTPS egress works. T4 is unconstrained network; the canonical test target is the Molecule-owned Gitea middleman over its public name. CI must not depend on GitHub or other mirrors for this probe. Any reachable HTTPS endpoint satisfies it — the YAML carries the recommended targets but accepts any 200/301/302.",
|
||||
Probe: `for U in $MOLECULE_T4_EGRESS_TARGETS; do ` +
|
||||
` C=$(curl -sS -o /dev/null -w '%{http_code}' --max-time 8 "$U"); ` +
|
||||
` case "$C" in 2*|3*) exit 0;; esac; ` +
|
||||
@@ -153,10 +153,9 @@ func T4PrivilegeContract() []T4Capability {
|
||||
Severity: SeverityHard,
|
||||
Source: "task #174 brief",
|
||||
RequiredEgress: []string{
|
||||
// Public, no auth, returns a small JSON.
|
||||
// Molecule-owned, public, no auth, returns a small JSON.
|
||||
// Adopters override via MOLECULE_T4_EGRESS_TARGETS.
|
||||
"https://api.github.com/zen",
|
||||
"https://www.google.com/generate_204",
|
||||
"https://git.moleculesai.app/api/v1/version",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -169,7 +168,7 @@ func T4PrivilegeContract() []T4Capability {
|
||||
{
|
||||
Name: "pid_host_visible",
|
||||
Description: "Host PID namespace is shared (--pid=host). The container can see host process 1 (systemd or pid-1 on the EC2 instance). Required for nsenter into host mount/pid namespaces.",
|
||||
Probe: `[ -d /proc/1/root ] && [ "$(sudo -n readlink /proc/1/ns/pid)" = "$(sudo -n readlink /proc/self/ns/pid)" ]`,
|
||||
Probe: `[ "$(sudo -n nsenter --target 1 --mount --pid -- id -u)" = "0" ]`,
|
||||
Severity: SeverityHard,
|
||||
Source: "provisioner.go applyHostConfig T4 branch (case 4): hostCfg.PidMode = 'host'",
|
||||
},
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
@@ -77,6 +78,19 @@ func TestT4PrivilegeContract_CoreCapabilitiesPresent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestT4PrivilegeContract_DefaultEgressUsesMoleculeOwnedEndpoint(t *testing.T) {
|
||||
for _, c := range T4PrivilegeContract() {
|
||||
for _, target := range c.RequiredEgress {
|
||||
if strings.Contains(target, "github.com") {
|
||||
t.Errorf("capability %q default egress target must not depend on GitHub mirror/API: %s", c.Name, target)
|
||||
}
|
||||
if strings.Contains(target, "google.com") {
|
||||
t.Errorf("capability %q default egress target must not depend on external Google endpoint: %s", c.Name, target)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestT4PrivilegeContract_HardCapabilitiesMajority sanity-checks that
|
||||
// the contract is not silently advisory-only. If someone marks
|
||||
// everything as "advisory" the gate becomes a no-op without anyone
|
||||
@@ -142,6 +156,17 @@ func TestAsYAML_EscapesEmbeddedQuotes(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGeneratedT4CapabilitiesYAMLMatchesSSOT(t *testing.T) {
|
||||
got, err := os.ReadFile("t4_capabilities.yaml")
|
||||
if err != nil {
|
||||
t.Fatalf("read generated t4_capabilities.yaml: %v", err)
|
||||
}
|
||||
want := AsYAML(T4PrivilegeContract())
|
||||
if string(got) != want {
|
||||
t.Fatal("generated t4_capabilities.yaml drifted from T4PrivilegeContract; regenerate with `go run ./cmd/t4-contract-dump > internal/provisioner/t4_capabilities.yaml`")
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentUIDConsistency ties the contract to the existing
|
||||
// provisioner-side AgentUID const. The probe for "agent_uid_1000"
|
||||
// hard-codes `id -u == 1000`; if AgentUID ever changes (no one
|
||||
|
||||
@@ -397,6 +397,8 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAuth.GET("/tokens", tokh.List)
|
||||
wsAuth.POST("/tokens", tokh.Create)
|
||||
wsAuth.DELETE("/tokens/:tokenId", tokh.Revoke)
|
||||
adminTokH := handlers.NewAdminWorkspaceTokenHandler()
|
||||
r.POST("/admin/workspaces/:id/tokens", middleware.AdminAuth(db.DB), adminTokH.Create)
|
||||
|
||||
// Memory
|
||||
memh := handlers.NewMemoryHandler()
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
ALTER TABLE workspaces
|
||||
DROP COLUMN IF EXISTS compute_instance_type;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
DROP COLUMN IF EXISTS compute_volume_root_gb;
|
||||
@@ -0,0 +1,10 @@
|
||||
-- Per-workspace EC2 compute configuration (#1686 Phase 1).
|
||||
-- Allows callers to override instance_type and root volume size
|
||||
-- at workspace creation time. Omitted/null values preserve the
|
||||
-- platform-managed default (current behaviour), so this is fully
|
||||
-- backwards-compatible.
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS compute_instance_type TEXT;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS compute_volume_root_gb INTEGER;
|
||||
Reference in New Issue
Block a user