forked from molecule-ai/molecule-core
Merge branch 'main' into feat/issue-63-local-build-from-gitea-v2
This commit is contained in:
commit
6bb272360d
652
.github/workflows/auto-promote-staging.yml
vendored
652
.github/workflows/auto-promote-staging.yml
vendored
@ -2,61 +2,148 @@ name: Auto-promote staging → main
|
||||
|
||||
# Fires after any of the staging-branch quality gates complete. When ALL
|
||||
# required gates are green on the same staging SHA, opens (or re-uses)
|
||||
# a PR `staging → main` and enables auto-merge so the merge queue lands
|
||||
# it. Closes the gap that historically let features sit on staging for
|
||||
# weeks waiting for a bulk promotion PR (see molecule-core#1496 for the
|
||||
# 1172-commit example).
|
||||
# a PR `staging → main` and schedules Gitea auto-merge so the PR lands
|
||||
# automatically once approval + status checks are satisfied.
|
||||
#
|
||||
# 2026-04-28 rewrite (PR #142): the previous version did a direct
|
||||
# `git merge --ff-only origin staging && git push origin main`. That
|
||||
# breaks against main's branch-protection ruleset, which requires
|
||||
# status checks "set by the expected GitHub apps" — direct pushes
|
||||
# can't satisfy that condition (only PR merges through the queue can).
|
||||
# The workflow was failing every tick with:
|
||||
# remote: error: GH006: Protected branch update failed for refs/heads/main.
|
||||
# remote: - Required status checks ... were not set by the expected GitHub apps.
|
||||
# Fix: mirror the PR-based pattern from auto-sync-main-to-staging.yml
|
||||
# (the reverse-direction sync, fixed in #2234 for the same reason).
|
||||
# Both directions now use the same merge-queue path that humans use,
|
||||
# no special-case bypass.
|
||||
# ============================================================
|
||||
# What this workflow does
|
||||
# ============================================================
|
||||
#
|
||||
# Safety model:
|
||||
# - Runs ONLY on workflow_run events for the staging branch.
|
||||
# - Requires EVERY named gate workflow to have the same head_sha and
|
||||
# all be `conclusion == success`. If any of them is red, skipped,
|
||||
# cancelled, or pending, we abort (stay on the current main).
|
||||
# - The PR base=main head=staging path lets GitHub itself enforce
|
||||
# branch protection. If main has diverged from staging or required
|
||||
# checks aren't satisfied, the merge queue declines the PR — no
|
||||
# need for a manual ff-only ancestry check here.
|
||||
# - Loop safety: the auto-sync-main-to-staging workflow fires when
|
||||
# main lands the auto-promote PR, but its merge into staging is by
|
||||
# GITHUB_TOKEN which doesn't trigger downstream workflow_run events
|
||||
# (GitHub Actions safety). So this workflow doesn't re-fire from
|
||||
# its own promote landing.
|
||||
# 1. On a workflow_run completion event for one of the staging gate
|
||||
# workflows (CI, E2E Staging Canvas, E2E API Smoke, CodeQL),
|
||||
# checks if the combined status on the staging head SHA is green.
|
||||
# 2. If green, opens (or re-uses) a PR `head: staging → base: main`
|
||||
# via Gitea REST `POST /api/v1/repos/.../pulls`.
|
||||
# 3. Schedules auto-merge via `POST /api/v1/repos/.../pulls/{index}/merge`
|
||||
# with `merge_when_checks_succeed: true`. Gitea waits for the
|
||||
# approval requirement on `main` (`required_approvals: 1`) and
|
||||
# the status-check gates, then merges.
|
||||
# 4. The merge commit lands on `main` and fires
|
||||
# `publish-workspace-server-image.yml` naturally via its
|
||||
# `on: push: branches: [main]` trigger — no explicit dispatch
|
||||
# needed (see "Why no workflow_dispatch tail" below).
|
||||
#
|
||||
# Toggle via repo variable AUTO_PROMOTE_ENABLED (true/unset). When
|
||||
# unset, the workflow logs what it would have done but doesn't open
|
||||
# the PR — useful for dry-running the gate logic without surfacing
|
||||
# a noisy PR while staging CI is still flaky.
|
||||
# `auto-sync-main-to-staging.yml` is the reverse-direction
|
||||
# counterpart (main → staging, fast-forward push). Together they
|
||||
# keep the staging-superset-of-main invariant tight.
|
||||
#
|
||||
# **One-time repo setting (load-bearing):** this workflow opens the
|
||||
# staging→main PR via `gh pr create` using the default GITHUB_TOKEN.
|
||||
# Since GitHub's 2022 default change, that token cannot create or
|
||||
# approve PRs unless the repo opts in. The toggle is at:
|
||||
# ============================================================
|
||||
# Why Gitea REST (and not `gh pr create`)
|
||||
# ============================================================
|
||||
#
|
||||
# Settings → Actions → General → Workflow permissions
|
||||
# → ✅ Allow GitHub Actions to create and approve pull requests
|
||||
# Pre-2026-05-06 this workflow used `gh pr create`, `gh pr merge --auto`,
|
||||
# `gh run list`, and `gh workflow run` against GitHub. After the
|
||||
# GitHub→Gitea cutover those calls fail because:
|
||||
#
|
||||
# Without it, every workflow_run fails with:
|
||||
# - `gh pr create / merge / view / list` route to GitHub GraphQL
|
||||
# (`/api/graphql`). Gitea does not expose a GraphQL endpoint;
|
||||
# every call returns `HTTP 405 Method Not Allowed` — same root
|
||||
# cause as #65 (auto-sync) which PR #66 fixed by dropping `gh`
|
||||
# entirely.
|
||||
# - `gh run list --workflow=...` GitHub-shape; Gitea has the
|
||||
# simpler `GET /repos/.../commits/{ref}/status` combined-status
|
||||
# endpoint instead.
|
||||
# - `gh workflow run X.yml` calls `POST /repos/.../actions/workflows/{id}/dispatches`,
|
||||
# which does NOT exist on Gitea 1.22.6 (verified via swagger.v1.json).
|
||||
#
|
||||
# pull request create failed: GraphQL: GitHub Actions is not
|
||||
# permitted to create or approve pull requests (createPullRequest)
|
||||
# So this workflow uses direct `curl` calls to Gitea REST. No `gh`
|
||||
# CLI dependency, no GraphQL, no missing-endpoint footgun.
|
||||
#
|
||||
# Observed 2026-04-29 01:43 UTC blocking promotion of fcd87b9 (PRs
|
||||
# #2248 + #2249); manually bridged via PR #2252. Re-check this
|
||||
# setting if auto-promote starts failing with createPullRequest
|
||||
# errors after a repo or org admin change.
|
||||
# ============================================================
|
||||
# Why no workflow_dispatch tail (was load-bearing on GitHub, dead on Gitea)
|
||||
# ============================================================
|
||||
#
|
||||
# The GitHub-era version had a 60-line polling step that waited for
|
||||
# the promote PR to merge, then explicitly dispatched
|
||||
# `publish-workspace-server-image.yml` on `--ref main`. That step
|
||||
# existed because GitHub's GITHUB_TOKEN-initiated merges suppress
|
||||
# downstream `on: push` workflows (the documented "no recursion" rule
|
||||
# — https://docs.github.com/en/actions/using-workflows/triggering-a-workflow#triggering-a-workflow-from-a-workflow).
|
||||
# The explicit dispatch was the workaround.
|
||||
#
|
||||
# Gitea Actions does NOT have this no-recursion rule. PR #66's auto-
|
||||
# sync merge to main fired `auto-promote-staging` on the next push
|
||||
# trigger naturally. So the cascade fires on the natural push event;
|
||||
# the explicit dispatch is dead code. (And even if we wanted to
|
||||
# preserve it, Gitea has no `workflow_dispatch` REST endpoint.)
|
||||
#
|
||||
# Removed in this rewrite. If we ever observe the cascade misfire,
|
||||
# operator can push an empty commit to `main` to wake it.
|
||||
#
|
||||
# ============================================================
|
||||
# Why open a PR (and not direct push)
|
||||
# ============================================================
|
||||
#
|
||||
# `main` branch protection has `enable_push: false` with NO
|
||||
# `push_whitelist_usernames`. Direct push is impossible for any
|
||||
# persona, including admins. PR-mediated merge is the only path,
|
||||
# which is intentional: prod state mutations (and staging→main IS a
|
||||
# prod mutation, since the next deploy fans out to tenants) require
|
||||
# Hongming's approval per `feedback_prod_apply_needs_hongming_chat_go`.
|
||||
#
|
||||
# The auto-merge schedule preserves this gate: `merge_when_checks_succeed`
|
||||
# does NOT bypass `required_approvals: 1`. Gitea waits for BOTH
|
||||
# approval AND green checks before merging. Hongming reviews via the
|
||||
# canvas/chat-handle of the PR notification, approves, and Gitea
|
||||
# auto-merges within seconds.
|
||||
#
|
||||
# ============================================================
|
||||
# Identity + token (anti-bot-ring per saved-memory
|
||||
# `feedback_per_agent_gitea_identity_default`)
|
||||
# ============================================================
|
||||
#
|
||||
# This workflow uses `secrets.AUTO_SYNC_TOKEN` — a personal access
|
||||
# token issued to the `devops-engineer` Gitea persona. NOT the
|
||||
# founder PAT. The bot-ring fingerprint that triggered the GitHub
|
||||
# org suspension on 2026-05-06 was characterised by founder PAT
|
||||
# acting as CI at machine speed.
|
||||
#
|
||||
# Token scope: `push: true` (read+write) on this repo. The persona
|
||||
# can: open PRs, comment on PRs, schedule auto-merge. The persona
|
||||
# CANNOT bypass main's branch protection (`required_approvals: 1`
|
||||
# still applies — only Hongming's review unblocks merge).
|
||||
#
|
||||
# Authorship: the PR is opened by `devops-engineer`; the merge
|
||||
# commit credits Hongming-as-approver and `devops-engineer` as
|
||||
# the merger.
|
||||
#
|
||||
# ============================================================
|
||||
# Failure modes & operational notes
|
||||
# ============================================================
|
||||
#
|
||||
# A — staging gates not all green at trigger time:
|
||||
# - The combined-status check returns `state: pending|failure`.
|
||||
# Workflow exits 0 with a step-summary "not all green; staying
|
||||
# on current main". Re-fires on the next gate completion.
|
||||
#
|
||||
# B — Gitea PR-create returns non-201 (e.g. 422 already-exists):
|
||||
# - Idempotent: the workflow first GETs the existing open
|
||||
# staging→main PR. If found, reuse it; if not, POST a new one.
|
||||
# 422 should never surface; if it does (race), step summary
|
||||
# captures the body and the next workflow_run picks up.
|
||||
#
|
||||
# C — `merge_when_checks_succeed` schedule fails:
|
||||
# - 422 with "Pull request is not mergeable" if there are
|
||||
# conflicts or stale base. Step summary surfaces it; operator
|
||||
# (or `auto-sync-main-to-staging`) needs to bring staging up
|
||||
# to date with main first. Workflow exits 1 to surface red.
|
||||
#
|
||||
# D — `AUTO_SYNC_TOKEN` rotated / wrong scope:
|
||||
# - 401/403 on first REST call. Step summary surfaces it.
|
||||
# Re-issue the token from `~/.molecule-ai/personas/` on the
|
||||
# operator host and update the repo Actions secret.
|
||||
#
|
||||
# ============================================================
|
||||
# Loop safety
|
||||
# ============================================================
|
||||
#
|
||||
# When the promote PR merges to main, `auto-sync-main-to-staging.yml`
|
||||
# fires (on:push:main) and pushes the merge commit back to staging.
|
||||
# That push to staging is by `devops-engineer`, NOT this workflow's
|
||||
# token, and triggers the staging gate workflows. When they all
|
||||
# complete, we end up back here — but the tree-diff guard catches
|
||||
# it: staging tree == main tree (the merge commit changes nothing),
|
||||
# so we skip and the cycle terminates.
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
@ -74,26 +161,16 @@ on:
|
||||
default: "false"
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
contents: read
|
||||
pull-requests: write
|
||||
# actions: write is needed by the post-merge dispatch tail step
|
||||
# (#2358 / #2357) — `gh workflow run publish-workspace-server-image.yml`
|
||||
# POSTs to /actions/workflows/.../dispatches which requires this scope.
|
||||
# Without it the call 403s and the publish/canary/redeploy chain still
|
||||
# doesn't run on staging→main promotions, undoing #2358.
|
||||
actions: write
|
||||
|
||||
# Serialize auto-promote runs. Multiple staging gate completions can land
|
||||
# in quick succession (CI + E2E + CodeQL all finish within seconds of
|
||||
# each other on a green PR) — without this, two parallel runs both:
|
||||
# 1. Open / re-use the same promote PR.
|
||||
# 2. Both call `gh pr merge --auto` (idempotent — fine).
|
||||
# 3. Both poll for the same mergedAt and both `gh workflow run` publish
|
||||
# → 2× redundant publish builds racing for the same `:staging-latest`
|
||||
# retag, and 2× canary-verify chains.
|
||||
# cancel-in-progress: false because we don't want a brand-new run to kill
|
||||
# a polling-tail that's about to dispatch — the polling tail's 30 min cap
|
||||
# is the right backstop, not workflow-level cancel.
|
||||
# 1. Would race the GET-or-POST PR step.
|
||||
# 2. Would both call merge-schedule (idempotent — fine on Gitea).
|
||||
# cancel-in-progress: false because the second run on a fresh staging
|
||||
# tip should NOT kill the first which has already opened the PR.
|
||||
concurrency:
|
||||
group: auto-promote-staging
|
||||
cancel-in-progress: false
|
||||
@ -111,126 +188,112 @@ jobs:
|
||||
all_green: ${{ steps.gates.outputs.all_green }}
|
||||
head_sha: ${{ steps.gates.outputs.head_sha }}
|
||||
steps:
|
||||
# Skip empty-tree promotes (the perpetual auto-promote↔auto-sync cycle
|
||||
# observed 2026-05-03). Sequence: auto-promote merges via the staging
|
||||
# merge-queue's MERGE strategy, creating a merge commit on main that
|
||||
# staging doesn't have. auto-sync then merges main back into staging
|
||||
# via another merge commit (the queue's MERGE strategy applies on
|
||||
# the staging side too, even when the workflow's local FF would
|
||||
# have sufficed). Now staging has a new merge-commit SHA whose
|
||||
# tree == main's tree — but auto-promote sees "staging ahead of
|
||||
# main by 1" and opens YET another empty promote PR. Each round
|
||||
# costs ~30-40 min wallclock, ~2 manual approvals, and burns a
|
||||
# full CodeQL Go run (~15 min). Without this guard the cycle
|
||||
# repeats indefinitely.
|
||||
#
|
||||
# Long-term fix is to switch the merge_queue ruleset's
|
||||
# `merge_method` away from MERGE so FF-able PRs land cleanly,
|
||||
# but that's a broader change affecting every staging PR's
|
||||
# commit shape. This guard is the one-line surgical fix that
|
||||
# breaks the cycle without touching merge-queue config.
|
||||
#
|
||||
# Fail-open: if `git diff` errors for any reason, fall through
|
||||
# to the gate check (preserve existing behavior). Only skip
|
||||
# when the diff is DEFINITIVELY empty.
|
||||
# Skip empty-tree promotes (the perpetual auto-promote↔auto-sync
|
||||
# cycle observed pre-cutover on GitHub). On Gitea the cycle shape
|
||||
# is different (auto-sync uses fast-forward, no merge commit),
|
||||
# but the tree-diff guard is cheap insurance and protects against
|
||||
# any future merge-style regression.
|
||||
- name: Checkout for tree-diff check
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
ref: staging
|
||||
- name: Skip if staging tree == main tree (perpetual-cycle break)
|
||||
|
||||
- name: Skip if staging tree == main tree (cycle-break safety)
|
||||
id: tree-diff
|
||||
env:
|
||||
HEAD_SHA: ${{ github.event.workflow_run.head_sha || github.sha }}
|
||||
run: |
|
||||
set -eu
|
||||
git fetch origin main --depth=50 || { echo "::warning::git fetch main failed — proceeding (fail-open)"; exit 0; }
|
||||
# Compare staging tip's tree against main's tree. `git diff
|
||||
# --quiet` exits 0 if no differences, 1 if there are.
|
||||
if git diff --quiet origin/main "$HEAD_SHA" -- 2>/dev/null; then
|
||||
{
|
||||
echo "## ⏭ Skipped — no code to promote"
|
||||
echo "## Skipped — no code to promote"
|
||||
echo
|
||||
echo "staging tip (\`${HEAD_SHA:0:8}\`) and \`main\` have identical trees."
|
||||
echo "This is the auto-promote↔auto-sync merge-commit cycle: staging has a"
|
||||
echo "new SHA (a sync-back merge commit) but the underlying file tree is"
|
||||
echo "already on main, so there's no real code to ship."
|
||||
echo
|
||||
echo "Skipping to avoid opening an empty promote PR. Cycle terminates here."
|
||||
echo "Skipping to avoid opening an empty promote PR."
|
||||
} >> "$GITHUB_STEP_SUMMARY"
|
||||
echo "::notice::auto-promote: staging tree == main tree — no code to promote, skipping"
|
||||
echo "skip=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "skip=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
- name: Check all required gates on this SHA
|
||||
|
||||
- name: Check combined status on staging head
|
||||
if: steps.tree-diff.outputs.skip != 'true'
|
||||
id: gates
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
GITEA_TOKEN: ${{ secrets.AUTO_SYNC_TOKEN }}
|
||||
HEAD_SHA: ${{ github.event.workflow_run.head_sha || github.sha }}
|
||||
REPO: ${{ github.repository }}
|
||||
GITEA_HOST: ${{ vars.GITEA_HOST || 'https://git.moleculesai.app' }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
||||
# Required gate workflow files. Use file paths (relative to
|
||||
# .github/workflows/) rather than display names because:
|
||||
# Gitea-native combined-status endpoint aggregates every
|
||||
# check context attached to a SHA. This is structurally
|
||||
# cleaner than the GitHub-era per-workflow `gh run list`
|
||||
# loop because:
|
||||
#
|
||||
# 1. `gh run list --workflow=<name>` is ambiguous when two
|
||||
# workflows have the same `name:` — observed 2026-04-28
|
||||
# with "CodeQL" matching both `codeql.yml` (explicit) and
|
||||
# GitHub's UI-configured Code-quality default setup
|
||||
# (internal "codeql"). gh CLI returns "could not resolve
|
||||
# to a unique workflow" → empty result → gate evaluated
|
||||
# as missing/none → auto-promote dead-locked despite all
|
||||
# checks actually passing.
|
||||
# 1. There's no risk of "workflow name collision" (the
|
||||
# GitHub-era code had to switch from `--workflow=NAME`
|
||||
# to `--workflow=FILE.YML` to disambiguate "CodeQL"
|
||||
# between the explicit workflow and GitHub's UI-
|
||||
# configured default setup; Gitea has no such
|
||||
# duplicate-name surface).
|
||||
# 2. Gitea's combined state already encodes the AND
|
||||
# across all contexts: success only if EVERY context
|
||||
# is success. Pending or failure on any context
|
||||
# produces non-success state.
|
||||
#
|
||||
# 2. File paths are the unique identifier for workflows;
|
||||
# `name:` is just a display string and can collide.
|
||||
#
|
||||
# When adding/removing a gate, update this list AND the
|
||||
# branch-protection required-checks list (which uses check-run
|
||||
# display names, not workflow names; the two are decoupled and
|
||||
# should be kept in sync manually).
|
||||
GATES=(
|
||||
"ci.yml"
|
||||
"e2e-staging-canvas.yml"
|
||||
"e2e-api.yml"
|
||||
"codeql.yml"
|
||||
)
|
||||
# See https://docs.gitea.com/api/1.22 for the schema —
|
||||
# `state` is one of: success, pending, failure, error.
|
||||
|
||||
echo "head_sha=${HEAD_SHA}" >> "$GITHUB_OUTPUT"
|
||||
echo "Checking gates on SHA ${HEAD_SHA}"
|
||||
echo "Checking combined status on SHA ${HEAD_SHA}"
|
||||
|
||||
ALL_GREEN=true
|
||||
for gate in "${GATES[@]}"; do
|
||||
# Query the most recent run of this workflow on this SHA.
|
||||
# event=push to avoid picking up PR runs. branch=staging to
|
||||
# guard against someone dispatching the gate on a non-staging
|
||||
# branch at the same SHA.
|
||||
RESULT=$(gh run list \
|
||||
--repo "$REPO" \
|
||||
--workflow "$gate" \
|
||||
--branch staging \
|
||||
--event push \
|
||||
--commit "$HEAD_SHA" \
|
||||
--limit 1 \
|
||||
--json status,conclusion \
|
||||
--jq '.[0] | "\(.status)/\(.conclusion // "none")"' \
|
||||
2>/dev/null || echo "missing/none")
|
||||
# `set +o pipefail` for the http-code capture pattern; restore
|
||||
# immediately. Pattern hardened per `feedback_curl_status_capture_pollution`.
|
||||
BODY_FILE=$(mktemp)
|
||||
set +e
|
||||
STATUS=$(curl -sS \
|
||||
-H "Authorization: token ${GITEA_TOKEN}" \
|
||||
-H "Accept: application/json" \
|
||||
-o "${BODY_FILE}" \
|
||||
-w "%{http_code}" \
|
||||
"${GITEA_HOST}/api/v1/repos/${REPO}/commits/${HEAD_SHA}/status")
|
||||
CURL_RC=$?
|
||||
set -e
|
||||
|
||||
echo " $gate → $RESULT"
|
||||
if [ "${CURL_RC}" -ne 0 ] || [ "${STATUS}" != "200" ]; then
|
||||
echo "::error::combined-status fetch failed: curl=${CURL_RC} http=${STATUS}"
|
||||
cat "${BODY_FILE}" | head -c 500 || true
|
||||
rm -f "${BODY_FILE}"
|
||||
echo "all_green=false" >> "$GITHUB_OUTPUT"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Only completed/success counts. completed/failure or
|
||||
# in_progress/anything or no record at all = abort.
|
||||
if [ "$RESULT" != "completed/success" ]; then
|
||||
ALL_GREEN=false
|
||||
fi
|
||||
done
|
||||
STATE=$(jq -r '.state // "missing"' < "${BODY_FILE}")
|
||||
TOTAL=$(jq -r '.total_count // 0' < "${BODY_FILE}")
|
||||
rm -f "${BODY_FILE}"
|
||||
|
||||
echo "all_green=${ALL_GREEN}" >> "$GITHUB_OUTPUT"
|
||||
if [ "$ALL_GREEN" != "true" ]; then
|
||||
echo "::notice::auto-promote: not all gates are green on ${HEAD_SHA} — staying on current main"
|
||||
echo "Combined status: state=${STATE} total_count=${TOTAL}"
|
||||
|
||||
if [ "${STATE}" = "success" ] && [ "${TOTAL}" -gt 0 ]; then
|
||||
echo "all_green=true" >> "$GITHUB_OUTPUT"
|
||||
echo "::notice::All gates green on ${HEAD_SHA} (${TOTAL} contexts)"
|
||||
else
|
||||
echo "all_green=false" >> "$GITHUB_OUTPUT"
|
||||
{
|
||||
echo "## Not promoting — combined status not green"
|
||||
echo
|
||||
echo "- SHA: \`${HEAD_SHA:0:8}\`"
|
||||
echo "- Combined state: \`${STATE}\`"
|
||||
echo "- Context count: ${TOTAL}"
|
||||
echo
|
||||
echo "Will re-fire on the next gate completion. Investigate any red gate via the Actions UI."
|
||||
} >> "$GITHUB_STEP_SUMMARY"
|
||||
echo "::notice::auto-promote: combined status is ${STATE} on ${HEAD_SHA} — staying on current main"
|
||||
fi
|
||||
|
||||
promote:
|
||||
@ -247,188 +310,183 @@ jobs:
|
||||
# Repo variable AUTO_PROMOTE_ENABLED=true flips this on. While
|
||||
# it's unset, the workflow dry-runs (logs what it would have
|
||||
# done) but doesn't open the promote PR. Set the variable in
|
||||
# Settings → Secrets and variables → Actions → Variables.
|
||||
# Settings → Actions → Variables.
|
||||
if [ "${AUTO_PROMOTE_ENABLED:-}" != "true" ] && [ "${FORCE_INPUT:-false}" != "true" ]; then
|
||||
{
|
||||
echo "## ⏸ Auto-promote disabled"
|
||||
echo "## Auto-promote disabled"
|
||||
echo
|
||||
echo "Repo variable \`AUTO_PROMOTE_ENABLED\` is not set to \`true\`."
|
||||
echo "All gates are green on staging; would have opened a promote PR to \`main\`."
|
||||
echo
|
||||
echo "To enable: Settings → Secrets and variables → Actions → Variables → \`AUTO_PROMOTE_ENABLED=true\`."
|
||||
echo "To enable: Settings → Actions → Variables → \`AUTO_PROMOTE_ENABLED=true\`."
|
||||
echo "To test once manually: workflow_dispatch with \`force=true\`."
|
||||
} >> "$GITHUB_STEP_SUMMARY"
|
||||
echo "::notice::auto-promote disabled — dry run only"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Mint the App token BEFORE the promote-PR step so the auto-merge
|
||||
# call can use it. GITHUB_TOKEN-initiated merges suppress the
|
||||
# downstream `push` event on main, breaking the
|
||||
# publish-workspace-server-image → canary-verify → redeploy-tenants
|
||||
# chain (issue #2357). Using the App token here means the
|
||||
# merge-queue-landed merge IS able to fire the cascade naturally;
|
||||
# the polling tail below stays as defense-in-depth.
|
||||
- name: Mint App token for promote-PR + downstream dispatch
|
||||
if: ${{ vars.AUTO_PROMOTE_ENABLED == 'true' || github.event.inputs.force == 'true' }}
|
||||
id: app-token
|
||||
uses: actions/create-github-app-token@1b10c78c7865c340bc4f6099eb2f838309f1e8c3 # v3.1.1
|
||||
with:
|
||||
app-id: ${{ secrets.MOLECULE_AI_APP_ID }}
|
||||
private-key: ${{ secrets.MOLECULE_AI_APP_PRIVATE_KEY }}
|
||||
|
||||
- name: Open (or reuse) staging → main promote PR + enable auto-merge
|
||||
- name: Open or reuse promote PR + schedule auto-merge
|
||||
if: ${{ vars.AUTO_PROMOTE_ENABLED == 'true' || github.event.inputs.force == 'true' }}
|
||||
env:
|
||||
GH_TOKEN: ${{ steps.app-token.outputs.token }}
|
||||
GITEA_TOKEN: ${{ secrets.AUTO_SYNC_TOKEN }}
|
||||
REPO: ${{ github.repository }}
|
||||
TARGET_SHA: ${{ needs.check-all-gates-green.outputs.head_sha }}
|
||||
GITEA_HOST: ${{ vars.GITEA_HOST || 'https://git.moleculesai.app' }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
||||
# Look for an existing open promote PR (idempotent on re-run
|
||||
# of the workflow). The PR's head IS the staging branch — the
|
||||
# whole point is "advance main to staging's tip", so we don't
|
||||
# need a per-SHA branch like auto-sync-main-to-staging uses.
|
||||
PR_NUM=$(gh pr list --repo "$REPO" \
|
||||
--base main --head staging --state open \
|
||||
--json number --jq '.[0].number // ""')
|
||||
API="${GITEA_HOST}/api/v1/repos/${REPO}"
|
||||
AUTH=(-H "Authorization: token ${GITEA_TOKEN}" -H "Accept: application/json")
|
||||
|
||||
if [ -z "$PR_NUM" ]; then
|
||||
# http_status_get RESULT_VAR URL
|
||||
# Sets RESULT_VAR to "<http_code>:<body_file>". Curl status
|
||||
# capture pattern per `feedback_curl_status_capture_pollution`:
|
||||
# http_code goes to its own tempfile-equivalent (-w), body to
|
||||
# another tempfile, set +e/-e bracket protects pipeline state.
|
||||
http_get() {
|
||||
local body_file="$1"; shift
|
||||
local url="$1"; shift
|
||||
set +e
|
||||
local code
|
||||
code=$(curl -sS "${AUTH[@]}" -o "${body_file}" -w "%{http_code}" "${url}")
|
||||
local rc=$?
|
||||
set -e
|
||||
if [ "${rc}" -ne 0 ]; then
|
||||
echo "::error::curl GET failed (rc=${rc}) on ${url}"
|
||||
return 99
|
||||
fi
|
||||
echo "${code}"
|
||||
}
|
||||
http_post_json() {
|
||||
local body_file="$1"; shift
|
||||
local data="$1"; shift
|
||||
local url="$1"; shift
|
||||
set +e
|
||||
local code
|
||||
code=$(curl -sS "${AUTH[@]}" -H "Content-Type: application/json" \
|
||||
-X POST -d "${data}" -o "${body_file}" -w "%{http_code}" "${url}")
|
||||
local rc=$?
|
||||
set -e
|
||||
if [ "${rc}" -ne 0 ]; then
|
||||
echo "::error::curl POST failed (rc=${rc}) on ${url}"
|
||||
return 99
|
||||
fi
|
||||
echo "${code}"
|
||||
}
|
||||
|
||||
# Step 1: look for an existing open staging→main promote PR
|
||||
# (idempotent on workflow re-run). Gitea doesn't have a
|
||||
# head/base filter on the list endpoint that's as ergonomic
|
||||
# as gh's, but the dedicated `/pulls/{base}/{head}` lookup
|
||||
# works.
|
||||
BODY=$(mktemp)
|
||||
STATUS=$(http_get "${BODY}" "${API}/pulls/main/staging") || true
|
||||
|
||||
PR_NUM=""
|
||||
if [ "${STATUS}" = "200" ]; then
|
||||
STATE=$(jq -r '.state // "missing"' < "${BODY}")
|
||||
if [ "${STATE}" = "open" ]; then
|
||||
PR_NUM=$(jq -r '.number // ""' < "${BODY}")
|
||||
echo "::notice::Re-using existing open promote PR #${PR_NUM}"
|
||||
fi
|
||||
fi
|
||||
rm -f "${BODY}"
|
||||
|
||||
# Step 2: if no open PR, create one.
|
||||
if [ -z "${PR_NUM}" ]; then
|
||||
TITLE="staging → main: auto-promote ${TARGET_SHA:0:7}"
|
||||
BODY_FILE=$(mktemp)
|
||||
cat > "$BODY_FILE" <<EOFBODY
|
||||
Automated promotion of \`staging\` (\`${TARGET_SHA:0:8}\`) to \`main\`. All required staging gates green at this SHA: CI, E2E Staging Canvas, E2E API Smoke, CodeQL.
|
||||
BODY_TEXT=$(cat <<EOFBODY
|
||||
Automated promotion of \`staging\` (\`${TARGET_SHA:0:8}\`) to \`main\`. All required staging gates are green at this SHA (combined status reported success).
|
||||
|
||||
This PR is auto-generated by \`.github/workflows/auto-promote-staging.yml\` whenever every required gate completes green on the same staging SHA. It exists because main's branch protection requires status checks "set by the expected GitHub apps" — direct \`git push\` from a workflow can't satisfy that, only PR merges through the queue can.
|
||||
This PR is auto-generated by \`.github/workflows/auto-promote-staging.yml\` whenever every required gate completes green on the same staging SHA.
|
||||
|
||||
Merge queue lands this; no human action needed unless gates fail. Reverse-direction sync (the merge commit on main → staging) is handled by \`auto-sync-main-to-staging.yml\`.
|
||||
**Approval gate:** \`main\` branch protection requires 1 approval before this can land. Once approved, Gitea will auto-merge (the workflow scheduled \`merge_when_checks_succeed: true\` immediately after open).
|
||||
|
||||
The reverse-direction sync (the merge commit on \`main\` → \`staging\`) is handled automatically by \`auto-sync-main-to-staging.yml\` after this PR lands.
|
||||
|
||||
---
|
||||
- Source: staging at \`${TARGET_SHA}\`
|
||||
- Opened by: \`devops-engineer\` persona (anti-bot-ring; never founder PAT)
|
||||
- Refs: #65, #73, #195
|
||||
EOFBODY
|
||||
PR_URL=$(gh pr create --repo "$REPO" \
|
||||
--base main --head staging \
|
||||
--title "$TITLE" \
|
||||
--body-file "$BODY_FILE")
|
||||
PR_NUM=$(echo "$PR_URL" | grep -oE '[0-9]+$' | tail -1)
|
||||
rm -f "$BODY_FILE"
|
||||
echo "::notice::Opened PR #${PR_NUM}"
|
||||
else
|
||||
echo "::notice::Re-using existing promote PR #${PR_NUM}"
|
||||
)
|
||||
REQ=$(jq -n \
|
||||
--arg title "${TITLE}" \
|
||||
--arg body "${BODY_TEXT}" \
|
||||
--arg base "main" \
|
||||
--arg head "staging" \
|
||||
'{title:$title, body:$body, base:$base, head:$head}')
|
||||
|
||||
BODY=$(mktemp)
|
||||
STATUS=$(http_post_json "${BODY}" "${REQ}" "${API}/pulls")
|
||||
|
||||
if [ "${STATUS}" = "201" ]; then
|
||||
PR_NUM=$(jq -r '.number // ""' < "${BODY}")
|
||||
echo "::notice::Opened promote PR #${PR_NUM}"
|
||||
else
|
||||
echo "::error::Failed to create promote PR: HTTP ${STATUS}"
|
||||
jq -r '.message // .' < "${BODY}" | head -c 500
|
||||
rm -f "${BODY}"
|
||||
exit 1
|
||||
fi
|
||||
rm -f "${BODY}"
|
||||
fi
|
||||
|
||||
# Enable auto-merge — the merge queue picks it up once
|
||||
# required gates are green on the merge_group ref.
|
||||
if ! gh pr merge "$PR_NUM" --repo "$REPO" --auto --merge 2>&1; then
|
||||
echo "::warning::Failed to enable auto-merge on PR #${PR_NUM} — operator may need to merge manually."
|
||||
fi
|
||||
# Step 3: schedule auto-merge. merge_when_checks_succeed
|
||||
# tells Gitea to wait for both:
|
||||
# - all required status checks to pass
|
||||
# - the required-approvals gate (1 approval on main)
|
||||
# before merging. On approval+green, Gitea merges within
|
||||
# seconds. On any check failing or approval being denied,
|
||||
# the schedule stays armed but doesn't fire.
|
||||
#
|
||||
# Idempotent: re-arming on an already-armed PR is a no-op.
|
||||
REQ=$(jq -n '{Do:"merge", merge_when_checks_succeed:true}')
|
||||
BODY=$(mktemp)
|
||||
STATUS=$(http_post_json "${BODY}" "${REQ}" "${API}/pulls/${PR_NUM}/merge")
|
||||
|
||||
# Gitea returns:
|
||||
# - 200/204 on successful immediate merge (gates already green AND approved)
|
||||
# - 405 "Please try again later" when scheduled successfully but waiting
|
||||
# - 422 on "Pull request is not mergeable" (conflict, stale base, etc.)
|
||||
#
|
||||
# 405 here is benign — Gitea's way of saying "scheduled, not merging now".
|
||||
# We treat 200/204/405 as success, anything else as failure.
|
||||
case "${STATUS}" in
|
||||
200|204)
|
||||
MERGE_OUTCOME="merged-immediately"
|
||||
echo "::notice::Promote PR #${PR_NUM} merged immediately (gates+approval already green)"
|
||||
;;
|
||||
405)
|
||||
MERGE_OUTCOME="auto-merge-scheduled"
|
||||
echo "::notice::Promote PR #${PR_NUM}: auto-merge scheduled (Gitea will land on approval+green)"
|
||||
;;
|
||||
422)
|
||||
MERGE_OUTCOME="not-mergeable"
|
||||
echo "::warning::Promote PR #${PR_NUM}: not mergeable (conflict, stale base, or already merging)."
|
||||
jq -r '.message // .' < "${BODY}" | head -c 500
|
||||
;;
|
||||
*)
|
||||
echo "::error::Unexpected status ${STATUS} on merge schedule"
|
||||
jq -r '.message // .' < "${BODY}" | head -c 500
|
||||
rm -f "${BODY}"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
rm -f "${BODY}"
|
||||
|
||||
{
|
||||
echo "## ✅ Auto-promote PR opened"
|
||||
echo "## Auto-promote PR opened"
|
||||
echo
|
||||
echo "- Source: staging at \`${TARGET_SHA:0:8}\`"
|
||||
echo "- PR: #${PR_NUM}"
|
||||
echo "- Outcome: \`${MERGE_OUTCOME}\`"
|
||||
echo
|
||||
echo "Merge queue lands the PR once required gates are green; no human action needed unless gates fail."
|
||||
if [ "${MERGE_OUTCOME}" = "auto-merge-scheduled" ]; then
|
||||
echo "Gitea will auto-merge once Hongming approves and all checks are green. No human action needed beyond approval."
|
||||
elif [ "${MERGE_OUTCOME}" = "merged-immediately" ]; then
|
||||
echo "Merged immediately. \`publish-workspace-server-image.yml\` will fire naturally on the resulting \`main\` push."
|
||||
else
|
||||
echo "PR is not auto-merging. Operator may need to bring staging up to date with main, then re-trigger this workflow via workflow_dispatch."
|
||||
fi
|
||||
} >> "$GITHUB_STEP_SUMMARY"
|
||||
|
||||
# Hand the PR number to the next step so we can dispatch the
|
||||
# tenant-redeploy chain after the merge queue lands the merge.
|
||||
echo "promote_pr_num=${PR_NUM}" >> "$GITHUB_OUTPUT"
|
||||
id: promote_pr
|
||||
|
||||
# The App token minted above (before the promote-PR step) is
|
||||
# also used by the polling tail below. Defense-in-depth: with
|
||||
# the merge-queue-landed merge now using the App token, the
|
||||
# main-branch push event SHOULD fire the publish/canary/redeploy
|
||||
# cascade naturally — but if for any reason it doesn't (e.g. an
|
||||
# unrelated event-suppression edge case), the explicit dispatches
|
||||
# below still wake the chain.
|
||||
- name: Wait for promote merge, then dispatch publish + redeploy (#2357)
|
||||
# Defense-in-depth dispatch. With the auto-merge call above
|
||||
# now using the App token (this commit), the merge-queue-landed
|
||||
# merge SHOULD fire publish-workspace-server-image naturally
|
||||
# via on:push:[main] — App-token-initiated pushes DO trigger
|
||||
# workflow_run cascades, unlike GITHUB_TOKEN-initiated ones
|
||||
# (the documented "no recursion" rule —
|
||||
# https://docs.github.com/en/actions/using-workflows/triggering-a-workflow#triggering-a-workflow-from-a-workflow).
|
||||
#
|
||||
# This explicit dispatch stays as belt-and-suspenders for any
|
||||
# edge case where the natural cascade misfires. If it never
|
||||
# observably fires after this token swap (i.e. the publish
|
||||
# workflow has already started by the time we get here), the
|
||||
# second dispatch is a harmless no-op (publish-workspace-server-image
|
||||
# has its own concurrency group that dedupes).
|
||||
#
|
||||
# See PR for #2357: pre-fix the merge action was via
|
||||
# GITHUB_TOKEN, suppressing the cascade and forcing this tail
|
||||
# to be the SOLE chain trigger. With the auto-merge token swap
|
||||
# the tail becomes redundant in the happy path; keep until
|
||||
# we've observed >=10 successful natural cascades, then drop.
|
||||
if: steps.promote_pr.outputs.promote_pr_num != ''
|
||||
env:
|
||||
GH_TOKEN: ${{ steps.app-token.outputs.token }}
|
||||
REPO: ${{ github.repository }}
|
||||
PR_NUM: ${{ steps.promote_pr.outputs.promote_pr_num }}
|
||||
run: |
|
||||
# Poll for merge — max 30 min (60 × 30s). The merge queue
|
||||
# typically lands within 5-10 min when gates are green. Break
|
||||
# early if the PR is closed without merging (operator action,
|
||||
# gates flipped red post-approval, branch-protection rejection)
|
||||
# so we don't tie up a runner for the full 30 min on a dead PR.
|
||||
MERGED=""
|
||||
STATE=""
|
||||
for _ in $(seq 1 60); do
|
||||
VIEW=$(gh pr view "$PR_NUM" --repo "$REPO" --json mergedAt,state)
|
||||
MERGED=$(echo "$VIEW" | jq -r '.mergedAt // ""')
|
||||
STATE=$(echo "$VIEW" | jq -r '.state // ""')
|
||||
if [ -n "$MERGED" ] && [ "$MERGED" != "null" ]; then
|
||||
echo "::notice::Promote PR #${PR_NUM} merged at ${MERGED}"
|
||||
break
|
||||
fi
|
||||
if [ "$STATE" = "CLOSED" ]; then
|
||||
echo "::warning::Promote PR #${PR_NUM} was closed without merging — skipping deploy dispatch."
|
||||
exit 0
|
||||
fi
|
||||
sleep 30
|
||||
done
|
||||
|
||||
if [ -z "$MERGED" ] || [ "$MERGED" = "null" ]; then
|
||||
echo "::warning::Promote PR #${PR_NUM} didn't merge within 30min — skipping deploy dispatch (manually run \`gh workflow run publish-workspace-server-image.yml --ref main\` once it lands)."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Dispatch publish on main using the App token. App-initiated
|
||||
# workflow_dispatch DOES propagate the workflow_run cascade,
|
||||
# unlike GITHUB_TOKEN-initiated dispatch.
|
||||
# publish completes → canary-verify chains via workflow_run →
|
||||
# redeploy-tenants-on-main chains via workflow_run + branches:[main].
|
||||
if gh workflow run publish-workspace-server-image.yml \
|
||||
--repo "$REPO" --ref main 2>&1; then
|
||||
echo "::notice::Dispatched publish-workspace-server-image on ref=main as molecule-ai App — canary-verify and redeploy-tenants-on-main will chain via workflow_run."
|
||||
{
|
||||
echo "## 🚀 Tenant redeploy chain dispatched"
|
||||
echo
|
||||
echo "- publish-workspace-server-image (workflow_dispatch on \`main\`, actor: \`molecule-ai[bot]\`)"
|
||||
echo "- canary-verify will chain on completion"
|
||||
echo "- redeploy-tenants-on-main will chain on canary green"
|
||||
} >> "$GITHUB_STEP_SUMMARY"
|
||||
else
|
||||
echo "::error::Failed to dispatch publish-workspace-server-image. Run manually: gh workflow run publish-workspace-server-image.yml --ref main"
|
||||
fi
|
||||
|
||||
# ALSO dispatch auto-sync-main-to-staging.yml. Same root cause as
|
||||
# publish above (issue #2357): the merge-queue-initiated push to
|
||||
# main is by GITHUB_TOKEN → no `on: push` triggers fire downstream.
|
||||
# Without this dispatch, every staging→main promote leaves staging
|
||||
# one merge commit BEHIND main, which silently dead-locks the NEXT
|
||||
# promote PR as `mergeStateStatus: BEHIND` because main's
|
||||
# branch-protection has `strict: true`. Verified empirically on
|
||||
# 2026-05-02 against PR #2442 (Phase 2 promote): only the explicit
|
||||
# publish-workspace-server-image dispatch fired on the previous
|
||||
# promote SHA 76c604fb, while auto-sync silently no-op'd, leaving
|
||||
# staging behind for ~24h until manually bridged.
|
||||
if gh workflow run auto-sync-main-to-staging.yml \
|
||||
--repo "$REPO" --ref main 2>&1; then
|
||||
echo "::notice::Dispatched auto-sync-main-to-staging on ref=main as molecule-ai App — staging will absorb the new main merge commit via PR + merge queue."
|
||||
else
|
||||
echo "::error::Failed to dispatch auto-sync-main-to-staging. Run manually: gh workflow run auto-sync-main-to-staging.yml --ref main"
|
||||
fi
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
'use client';
|
||||
|
||||
import { useEffect, useMemo, useCallback } from "react";
|
||||
import { useEffect, useMemo, useCallback, useRef } from "react";
|
||||
import { type Edge, MarkerType } from "@xyflow/react";
|
||||
import { api } from "@/lib/api";
|
||||
import { useCanvasStore } from "@/store/canvas";
|
||||
import { useSocketEvent } from "@/hooks/useSocketEvent";
|
||||
import type { ActivityEntry } from "@/types/activity";
|
||||
|
||||
// ── Constants ─────────────────────────────────────────────────────────────────
|
||||
@ -11,9 +12,6 @@ import type { ActivityEntry } from "@/types/activity";
|
||||
/** 60-minute look-back window for delegation activity */
|
||||
export const A2A_WINDOW_MS = 60 * 60 * 1000;
|
||||
|
||||
/** Polling interval — refresh edges every 60 seconds */
|
||||
export const A2A_POLL_MS = 60 * 1_000;
|
||||
|
||||
/** Threshold for "hot" edges: < 5 minutes → animated + violet stroke */
|
||||
export const A2A_HOT_MS = 5 * 60 * 1_000;
|
||||
|
||||
@ -131,6 +129,20 @@ export function buildA2AEdges(
|
||||
* `a2aEdges`. Canvas.tsx merges these with topology edges and passes the
|
||||
* combined list to ReactFlow.
|
||||
*
|
||||
* Update shape (issue #61 Stage 2, replaces the 60s polling loop):
|
||||
* - On mount (when showA2AEdges): one HTTP fan-out per visible workspace
|
||||
* (delegation rows, 60-min window). Bootstraps the local row buffer.
|
||||
* - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent.
|
||||
* Each delegation event from a visible workspace is appended to the
|
||||
* buffer; edges are re-derived via the existing buildA2AEdges helper.
|
||||
* - showA2AEdges toggle off: clears edges + buffer.
|
||||
* - Visible-ID-set change: re-bootstraps so a freshly-shown workspace
|
||||
* backfills its 60-min history (existing visibleIdsKey selector
|
||||
* behaviour preserved — that's the 2026-05-04 render-loop fix).
|
||||
*
|
||||
* No interval poll. The singleton ReconnectingSocket already owns
|
||||
* reconnect / backoff / health-check; useSocketEvent inherits those.
|
||||
*
|
||||
* Mount this inside CanvasInner (no ReactFlow hook dependency).
|
||||
*/
|
||||
export function A2ATopologyOverlay() {
|
||||
@ -157,7 +169,9 @@ export function A2ATopologyOverlay() {
|
||||
// the symptom of this re-render storm.
|
||||
//
|
||||
// The fix is purely the dependency-stability change here; the fetch
|
||||
// logic is unchanged.
|
||||
// logic is unchanged. Post-#61 the polling-driven fetch is gone, but
|
||||
// the visibleIdsKey gate is still required so a peer-discovery write
|
||||
// doesn't trigger a wasteful re-bootstrap.
|
||||
const visibleIdsKey = useCanvasStore((s) =>
|
||||
s.nodes
|
||||
.filter((n) => !n.hidden)
|
||||
@ -171,16 +185,42 @@ export function A2ATopologyOverlay() {
|
||||
[visibleIdsKey]
|
||||
);
|
||||
|
||||
// Fetch delegation activity for all visible workspaces and rebuild overlay edges.
|
||||
const fetchAndUpdate = useCallback(async () => {
|
||||
// Local rolling buffer of delegation rows. Pruned by A2A_WINDOW_MS on
|
||||
// each rebuild so a long-lived session doesn't accumulate unbounded
|
||||
// history. The buffer's high-water mark is approximately:
|
||||
// visibleIds.length × bootstrap-fetch-limit (500) + WS arrivals
|
||||
// Real-world ceiling: ~3000 entries at the 60-min boundary, all of
|
||||
// which buildA2AEdges aggregates into at most N² edges.
|
||||
const bufferRef = useRef<ActivityEntry[]>([]);
|
||||
// visibleIdsRef gives the WS handler the latest visible-ID set without
|
||||
// re-subscribing on every render. The bus listener is registered
|
||||
// exactly once per mount; subscriber-side filtering reads from this ref.
|
||||
const visibleIdsRef = useRef(visibleIds);
|
||||
visibleIdsRef.current = visibleIds;
|
||||
|
||||
// Re-derive overlay edges from the current buffer + push to store.
|
||||
// Prunes by A2A_WINDOW_MS first so memory stays bounded across long
|
||||
// sessions and the aggregation cost stays O(window-size).
|
||||
const recomputeAndPush = useCallback(() => {
|
||||
const cutoff = Date.now() - A2A_WINDOW_MS;
|
||||
bufferRef.current = bufferRef.current.filter(
|
||||
(r) => new Date(r.created_at).getTime() > cutoff
|
||||
);
|
||||
setA2AEdges(buildA2AEdges(bufferRef.current));
|
||||
}, [setA2AEdges]);
|
||||
|
||||
// Bootstrap fan-out — one HTTP per visible workspace. Replaces the
|
||||
// 60s polling loop entirely. Race-aware: any WS arrivals that landed
|
||||
// in the buffer DURING the fetch (between the await and resume) are
|
||||
// preserved by id-dedup-with-fetched-first ordering.
|
||||
const bootstrap = useCallback(async () => {
|
||||
if (visibleIds.length === 0) {
|
||||
bufferRef.current = [];
|
||||
setA2AEdges([]);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
// Fan-out — one request per visible workspace.
|
||||
// Per-request failures are swallowed so one broken workspace doesn't blank the overlay.
|
||||
const allRows = (
|
||||
const fetchedRows = (
|
||||
await Promise.all(
|
||||
visibleIds.map((id) =>
|
||||
api
|
||||
@ -192,24 +232,76 @@ export function A2ATopologyOverlay() {
|
||||
)
|
||||
).flat();
|
||||
|
||||
setA2AEdges(buildA2AEdges(allRows));
|
||||
// Merge: fetched rows first, then any in-flight WS arrivals that
|
||||
// accumulated during the await. Dedup by id so rows that appear
|
||||
// in both paths are not double-counted in the aggregation.
|
||||
const merged = [...fetchedRows, ...bufferRef.current];
|
||||
const seen = new Set<string>();
|
||||
bufferRef.current = merged.filter((r) => {
|
||||
if (seen.has(r.id)) return false;
|
||||
seen.add(r.id);
|
||||
return true;
|
||||
});
|
||||
recomputeAndPush();
|
||||
} catch {
|
||||
// Overlay failure is non-critical — canvas remains functional
|
||||
}
|
||||
}, [visibleIds, setA2AEdges]);
|
||||
}, [visibleIds, setA2AEdges, recomputeAndPush]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!showA2AEdges) {
|
||||
// Clear edges immediately when toggled off
|
||||
// Clear edges + buffer immediately when toggled off
|
||||
bufferRef.current = [];
|
||||
setA2AEdges([]);
|
||||
return;
|
||||
}
|
||||
void bootstrap();
|
||||
}, [showA2AEdges, bootstrap, setA2AEdges]);
|
||||
|
||||
// Initial fetch, then poll every 60 s
|
||||
void fetchAndUpdate();
|
||||
const timer = setInterval(() => void fetchAndUpdate(), A2A_POLL_MS);
|
||||
return () => clearInterval(timer);
|
||||
}, [showA2AEdges, fetchAndUpdate, setA2AEdges]);
|
||||
// Live-update path. Filters server-side ACTIVITY_LOGGED events down
|
||||
// to delegation initiations from visible workspaces and appends each
|
||||
// into the rolling buffer, re-deriving edges via buildA2AEdges.
|
||||
//
|
||||
// Only `method === "delegate"` rows count — the same filter
|
||||
// buildA2AEdges applies — so delegate_result rows arriving over the
|
||||
// wire don't double-count.
|
||||
useSocketEvent((msg) => {
|
||||
if (!showA2AEdges) return;
|
||||
if (msg.event !== "ACTIVITY_LOGGED") return;
|
||||
|
||||
const p = (msg.payload || {}) as Record<string, unknown>;
|
||||
if (p.activity_type !== "delegation") return;
|
||||
if (p.method !== "delegate") return;
|
||||
|
||||
const wsId = msg.workspace_id;
|
||||
if (!visibleIdsRef.current.includes(wsId)) return;
|
||||
|
||||
// Synthesise an ActivityEntry from the WS payload so buildA2AEdges
|
||||
// (which the bootstrap path also feeds) handles it identically.
|
||||
const entry: ActivityEntry = {
|
||||
id:
|
||||
(p.id as string) ||
|
||||
`ws-push-${msg.timestamp || Date.now()}-${wsId}`,
|
||||
workspace_id: wsId,
|
||||
activity_type: "delegation",
|
||||
source_id: (p.source_id as string | null) ?? null,
|
||||
target_id: (p.target_id as string | null) ?? null,
|
||||
method: "delegate",
|
||||
summary: (p.summary as string | null) ?? null,
|
||||
request_body: null,
|
||||
response_body: null,
|
||||
duration_ms: (p.duration_ms as number | null) ?? null,
|
||||
status: (p.status as string) || "ok",
|
||||
error_detail: null,
|
||||
created_at:
|
||||
(p.created_at as string) ||
|
||||
msg.timestamp ||
|
||||
new Date().toISOString(),
|
||||
};
|
||||
|
||||
bufferRef.current = [...bufferRef.current, entry];
|
||||
recomputeAndPush();
|
||||
});
|
||||
|
||||
// Pure side-effect — renders nothing
|
||||
return null;
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { useCanvasStore } from "@/store/canvas";
|
||||
import { api } from "@/lib/api";
|
||||
import { useSocketEvent } from "@/hooks/useSocketEvent";
|
||||
import { COMM_TYPE_LABELS } from "@/lib/design-tokens";
|
||||
|
||||
interface Communication {
|
||||
@ -18,32 +19,71 @@ interface Communication {
|
||||
durationMs: number | null;
|
||||
}
|
||||
|
||||
/** Workspace-server `ACTIVITY_LOGGED` payload shape. Pulled out so the
|
||||
* WS handler below has a typed view of the same fields the HTTP
|
||||
* bootstrap consumes — drift between the two paths is a class of bug
|
||||
* AgentCommsPanel hit historically. */
|
||||
interface ActivityLoggedPayload {
|
||||
id?: string;
|
||||
activity_type?: string;
|
||||
source_id?: string | null;
|
||||
target_id?: string | null;
|
||||
workspace_id?: string;
|
||||
summary?: string | null;
|
||||
status?: string;
|
||||
duration_ms?: number | null;
|
||||
created_at?: string;
|
||||
}
|
||||
|
||||
/** Fan-out cap for the bootstrap HTTP fetch on mount / on visibility
|
||||
* re-open. Kept at 3 (carried over from the 2026-05-04 fix) so a
|
||||
* freshly-mounted overlay on a 15-workspace tenant only spends 3
|
||||
* round-trips bootstrapping. Live updates after that arrive via the
|
||||
* WS subscription below — no polling, no fan-out to maintain. */
|
||||
const BOOTSTRAP_FAN_OUT_CAP = 3;
|
||||
|
||||
/** Cap on the rendered list. Bootstrap + every WS push prepends, the
|
||||
* list is sliced to this size after each update. Mirrors the prior
|
||||
* polling-loop behaviour. */
|
||||
const COMMS_RENDER_CAP = 20;
|
||||
|
||||
/**
|
||||
* Overlay showing recent A2A communications between workspaces.
|
||||
* Renders as a floating log panel that auto-updates.
|
||||
*
|
||||
* Update shape (issue #61 Stage 1, replaces the 30s polling loop):
|
||||
* - On mount (when visible): one HTTP bootstrap per online workspace,
|
||||
* capped at BOOTSTRAP_FAN_OUT_CAP. Yields the initial recent-comms
|
||||
* window without waiting for live events.
|
||||
* - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent.
|
||||
* Each event with a matching activity_type from a visible online
|
||||
* workspace gets synthesised into a Communication and prepended.
|
||||
* - Visibility re-open: re-bootstraps so the user sees the freshest
|
||||
* window even if WS was idle while collapsed.
|
||||
*
|
||||
* No interval poll. The singleton ReconnectingSocket in `store/socket.ts`
|
||||
* already owns reconnect/backoff/health-check, and `useSocketEvent`
|
||||
* inherits those guarantees. If WS is genuinely unhealthy, the overlay
|
||||
* shows the bootstrap snapshot until the next visibility re-open or
|
||||
* the next WS reconnect (which fires its own rehydrate burst).
|
||||
*/
|
||||
export function CommunicationOverlay() {
|
||||
const [comms, setComms] = useState<Communication[]>([]);
|
||||
const [visible, setVisible] = useState(true);
|
||||
const selectedNodeId = useCanvasStore((s) => s.selectedNodeId);
|
||||
const nodes = useCanvasStore((s) => s.nodes);
|
||||
// nodesRef gives the WS handler current node-name resolution without
|
||||
// re-subscribing on every node-list change. The bus listener is
|
||||
// registered exactly once per mount; subscriber-side filtering reads
|
||||
// the latest value via this ref.
|
||||
const nodesRef = useRef(nodes);
|
||||
nodesRef.current = nodes;
|
||||
|
||||
const fetchComms = useCallback(async () => {
|
||||
const bootstrapComms = useCallback(async () => {
|
||||
try {
|
||||
// Fan-out cap: each polled workspace = 1 round-trip. The platform
|
||||
// rate limits at 600 req/min/IP; combined with heartbeats + other
|
||||
// canvas polling, every workspace polled here costs ~6 req/min
|
||||
// (1 every 30s × 1 per workspace). Capping at 3 keeps this
|
||||
// overlay's footprint at 18 req/min worst case — well under
|
||||
// budget even with 8+ workspaces visible. Caught 2026-05-04 when
|
||||
// a user with 8+ workspaces (Design Director + 6 sub-agents +
|
||||
// 3 standalones) saw sustained 429s in canvas console.
|
||||
const onlineNodes = nodesRef.current.filter((n) => n.data.status === "online");
|
||||
const allComms: Communication[] = [];
|
||||
|
||||
for (const node of onlineNodes.slice(0, 3)) {
|
||||
for (const node of onlineNodes.slice(0, BOOTSTRAP_FAN_OUT_CAP)) {
|
||||
try {
|
||||
const activities = await api.get<Array<{
|
||||
id: string;
|
||||
@ -59,8 +99,8 @@ export function CommunicationOverlay() {
|
||||
|
||||
for (const a of activities) {
|
||||
if (a.activity_type === "a2a_send" || a.activity_type === "a2a_receive") {
|
||||
const sourceNode = nodes.find((n) => n.id === (a.source_id || a.workspace_id));
|
||||
const targetNode = nodes.find((n) => n.id === (a.target_id || ""));
|
||||
const sourceNode = nodesRef.current.find((n) => n.id === (a.source_id || a.workspace_id));
|
||||
const targetNode = nodesRef.current.find((n) => n.id === (a.target_id || ""));
|
||||
allComms.push({
|
||||
id: a.id,
|
||||
sourceId: a.source_id || a.workspace_id,
|
||||
@ -76,11 +116,12 @@ export function CommunicationOverlay() {
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Skip workspaces that fail
|
||||
// Per-workspace failures must not blank the panel — the same
|
||||
// robustness the polling version had.
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by timestamp, newest first, dedupe
|
||||
// Newest-first with id-dedup, capped at COMMS_RENDER_CAP.
|
||||
const seen = new Set<string>();
|
||||
const sorted = allComms
|
||||
.sort((a, b) => b.timestamp.localeCompare(a.timestamp))
|
||||
@ -89,29 +130,78 @@ export function CommunicationOverlay() {
|
||||
seen.add(c.id);
|
||||
return true;
|
||||
})
|
||||
.slice(0, 20);
|
||||
.slice(0, COMMS_RENDER_CAP);
|
||||
|
||||
setComms(sorted);
|
||||
} catch {
|
||||
// Silently handle API errors
|
||||
// Bootstrap failure is non-blocking — the WS subscription below
|
||||
// will populate the panel as live events arrive.
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Bootstrap once on mount + every time the user re-opens after a
|
||||
// collapse. Closed-panel state intentionally drops live updates so
|
||||
// the panel doesn't churn invisible state — the next open reloads.
|
||||
useEffect(() => {
|
||||
// Gate polling on visibility — when the user collapses the overlay
|
||||
// the data isn't being read, so the per-workspace fan-out becomes
|
||||
// pure rate-limit overhead. Pre-fix this overlay polled regardless
|
||||
// of whether the panel was shown, costing ~36 req/min from a
|
||||
// hidden surface.
|
||||
if (!visible) return;
|
||||
fetchComms();
|
||||
// 30s cadence (was 10s). At 3-workspace fan-out that's 6 req/min
|
||||
// worst case from this overlay. Combined with heartbeats (~30/min)
|
||||
// and other canvas polling, leaves ample headroom under the 600/
|
||||
// min/IP server-side rate limit even at 8+ workspace tenants.
|
||||
const interval = setInterval(fetchComms, 30000);
|
||||
return () => clearInterval(interval);
|
||||
}, [fetchComms, visible]);
|
||||
bootstrapComms();
|
||||
}, [bootstrapComms, visible]);
|
||||
|
||||
// Live-update path. Filters server-side ACTIVITY_LOGGED events down
|
||||
// to the comm-overlay-relevant subset and prepends each into the
|
||||
// rendered list with the same dedup the bootstrap path uses.
|
||||
//
|
||||
// Scope guard: ignore events for workspaces not in the visible online
|
||||
// set, so a user collapsing one workspace doesn't see its comms
|
||||
// continue to scroll in. Same shape the bootstrap path applies.
|
||||
useSocketEvent((msg) => {
|
||||
if (!visible) return;
|
||||
if (msg.event !== "ACTIVITY_LOGGED") return;
|
||||
|
||||
const p = (msg.payload || {}) as ActivityLoggedPayload;
|
||||
const type = p.activity_type;
|
||||
if (type !== "a2a_send" && type !== "a2a_receive" && type !== "task_update") return;
|
||||
|
||||
const wsId = msg.workspace_id;
|
||||
const onlineSet = new Set(
|
||||
nodesRef.current.filter((n) => n.data.status === "online").map((n) => n.id),
|
||||
);
|
||||
if (!onlineSet.has(wsId)) return;
|
||||
|
||||
const sourceId = p.source_id || wsId;
|
||||
const targetId = p.target_id || "";
|
||||
const sourceNode = nodesRef.current.find((n) => n.id === sourceId);
|
||||
const targetNode = nodesRef.current.find((n) => n.id === targetId);
|
||||
|
||||
const incoming: Communication = {
|
||||
id: p.id || `${msg.timestamp || Date.now()}:${sourceId}:${targetId}`,
|
||||
sourceId,
|
||||
targetId,
|
||||
sourceName: sourceNode?.data.name || "Unknown",
|
||||
targetName: targetNode?.data.name || "Unknown",
|
||||
type: type as Communication["type"],
|
||||
summary: p.summary || "",
|
||||
status: p.status || "ok",
|
||||
timestamp: p.created_at || msg.timestamp || new Date().toISOString(),
|
||||
durationMs: p.duration_ms ?? null,
|
||||
};
|
||||
|
||||
setComms((prev) => {
|
||||
// Prepend, dedup by id, re-cap. Functional setState is necessary
|
||||
// because two ACTIVITY_LOGGED events arriving in the same React
|
||||
// batch would otherwise read a stale `comms` from the closure.
|
||||
const seen = new Set<string>();
|
||||
const merged = [incoming, ...prev]
|
||||
.sort((a, b) => b.timestamp.localeCompare(a.timestamp))
|
||||
.filter((c) => {
|
||||
if (seen.has(c.id)) return false;
|
||||
seen.add(c.id);
|
||||
return true;
|
||||
})
|
||||
.slice(0, COMMS_RENDER_CAP);
|
||||
return merged;
|
||||
});
|
||||
});
|
||||
|
||||
if (!visible || comms.length === 0) {
|
||||
return (
|
||||
|
||||
@ -41,6 +41,10 @@ vi.mock("@/store/canvas", () => ({
|
||||
// ── Imports (after mocks) ─────────────────────────────────────────────────────
|
||||
|
||||
import { api } from "@/lib/api";
|
||||
import {
|
||||
emitSocketEvent,
|
||||
_resetSocketEventListenersForTests,
|
||||
} from "@/store/socket-events";
|
||||
import {
|
||||
buildA2AEdges,
|
||||
formatA2ARelativeTime,
|
||||
@ -342,6 +346,151 @@ describe("A2ATopologyOverlay component", () => {
|
||||
expect(mockGet.mock.calls.length).toBe(callsAfterMount);
|
||||
});
|
||||
|
||||
// ── #61 Stage 2: ACTIVITY_LOGGED subscription tests ────────────────────────
|
||||
//
|
||||
// Pin the post-#61 behaviour: WS push for delegation contributes to
|
||||
// the overlay's edge buffer with NO additional HTTP fetch. Same shape
|
||||
// as Stage 1 (CommunicationOverlay).
|
||||
|
||||
describe("#61 stage 2 — ACTIVITY_LOGGED subscription", () => {
|
||||
beforeEach(() => {
|
||||
_resetSocketEventListenersForTests();
|
||||
});
|
||||
afterEach(() => {
|
||||
_resetSocketEventListenersForTests();
|
||||
});
|
||||
|
||||
function emitDelegation(overrides: {
|
||||
workspaceId?: string;
|
||||
sourceId?: string;
|
||||
targetId?: string;
|
||||
method?: string;
|
||||
activityType?: string;
|
||||
} = {}) {
|
||||
// Use Date.now() (real time, fake-timer-frozen) rather than the
|
||||
// hardcoded NOW constant — buildA2AEdges prunes by Date.now() -
|
||||
// A2A_WINDOW_MS, so a row dated against the wrong epoch silently
|
||||
// falls outside the window and the test fails for a confusing
|
||||
// reason ("edges array empty" vs "filter dropped my row").
|
||||
const realNow = Date.now();
|
||||
emitSocketEvent({
|
||||
event: "ACTIVITY_LOGGED",
|
||||
workspace_id: overrides.workspaceId ?? "ws-a",
|
||||
timestamp: new Date(realNow).toISOString(),
|
||||
payload: {
|
||||
id: `act-${Math.random().toString(36).slice(2)}`,
|
||||
activity_type: overrides.activityType ?? "delegation",
|
||||
method: overrides.method ?? "delegate",
|
||||
source_id: overrides.sourceId ?? "ws-a",
|
||||
target_id: overrides.targetId ?? "ws-b",
|
||||
status: "ok",
|
||||
created_at: new Date(realNow - 30_000).toISOString(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
it("does NOT poll on a 60s interval after bootstrap (post-#61)", async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
render(<A2ATopologyOverlay />);
|
||||
await act(async () => { await Promise.resolve(); });
|
||||
const callsAfterBootstrap = mockGet.mock.calls.length;
|
||||
expect(callsAfterBootstrap).toBe(2); // ws-a + ws-b
|
||||
|
||||
// Pre-#61: a 60s clock tick would fire a fresh fan-out (2 more
|
||||
// calls). Post-#61: no interval, no extra calls.
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(120_000);
|
||||
});
|
||||
expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap);
|
||||
});
|
||||
|
||||
it("WS push for a delegation event from a visible workspace updates edges with NO HTTP call", async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
render(<A2ATopologyOverlay />);
|
||||
await act(async () => { await Promise.resolve(); await Promise.resolve(); });
|
||||
mockGet.mockClear();
|
||||
mockStoreState.setA2AEdges.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitDelegation({ sourceId: "ws-a", targetId: "ws-b" });
|
||||
});
|
||||
|
||||
// Edges-set called with at least one a2a edge for the new push.
|
||||
const calls = mockStoreState.setA2AEdges.mock.calls;
|
||||
expect(calls.length).toBeGreaterThanOrEqual(1);
|
||||
const lastCall = calls[calls.length - 1][0] as Array<{ id: string }>;
|
||||
expect(lastCall.some((e) => e.id === "a2a-ws-a-ws-b")).toBe(true);
|
||||
|
||||
// Critical: no HTTP fetch fired during the WS path.
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push for a non-delegation activity_type is ignored", async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
render(<A2ATopologyOverlay />);
|
||||
await act(async () => { await Promise.resolve(); });
|
||||
mockStoreState.setA2AEdges.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitDelegation({ activityType: "a2a_send" });
|
||||
});
|
||||
|
||||
// setA2AEdges must not be called by the WS handler — the only
|
||||
// setA2AEdges calls in this test came from the initial bootstrap.
|
||||
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push for a delegate_result row is ignored (mirrors buildA2AEdges filter)", async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
render(<A2ATopologyOverlay />);
|
||||
await act(async () => { await Promise.resolve(); });
|
||||
mockStoreState.setA2AEdges.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitDelegation({ method: "delegate_result" });
|
||||
});
|
||||
|
||||
// delegate_result rows do not contribute to the edge count — they
|
||||
// are completion signals, not initiations.
|
||||
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push from a hidden workspace is ignored", async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
render(<A2ATopologyOverlay />);
|
||||
await act(async () => { await Promise.resolve(); });
|
||||
mockStoreState.setA2AEdges.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitDelegation({ workspaceId: "ws-hidden" });
|
||||
});
|
||||
|
||||
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push while showA2AEdges is false is ignored", async () => {
|
||||
mockStoreState.showA2AEdges = false;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
render(<A2ATopologyOverlay />);
|
||||
// The mount path with showA2AEdges=false calls setA2AEdges([])
|
||||
// once — clear that to isolate the WS path.
|
||||
mockStoreState.setA2AEdges.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitDelegation();
|
||||
});
|
||||
|
||||
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("re-fetches when the visible ID set actually changes", async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
mockGet.mockResolvedValue([] as any);
|
||||
|
||||
@ -36,6 +36,10 @@ vi.mock("@/hooks/useWorkspaceName", () => ({
|
||||
useWorkspaceName: () => () => "Test WS",
|
||||
}));
|
||||
|
||||
import {
|
||||
emitSocketEvent,
|
||||
_resetSocketEventListenersForTests,
|
||||
} from "@/store/socket-events";
|
||||
import { ActivityTab } from "../tabs/ActivityTab";
|
||||
|
||||
// ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
@ -358,6 +362,191 @@ describe("ActivityTab — refresh button", () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ── Suite 6.5: ACTIVITY_LOGGED subscription (#61 stage 3) ─────────────────────
|
||||
//
|
||||
// Pin the post-#61 behaviour: WS push extends the rendered list with NO
|
||||
// additional HTTP fetch. The 5s polling loop is gone; live updates
|
||||
// arrive over the WebSocket bus.
|
||||
|
||||
describe("ActivityTab — #61 stage 3: ACTIVITY_LOGGED subscription", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockGet.mockResolvedValue([]);
|
||||
_resetSocketEventListenersForTests();
|
||||
});
|
||||
afterEach(() => {
|
||||
cleanup();
|
||||
_resetSocketEventListenersForTests();
|
||||
});
|
||||
|
||||
function emitActivity(overrides: {
|
||||
workspaceId?: string;
|
||||
activityType?: string;
|
||||
summary?: string;
|
||||
id?: string;
|
||||
} = {}) {
|
||||
const realNow = Date.now();
|
||||
emitSocketEvent({
|
||||
event: "ACTIVITY_LOGGED",
|
||||
workspace_id: overrides.workspaceId ?? "ws-1",
|
||||
timestamp: new Date(realNow).toISOString(),
|
||||
payload: {
|
||||
id: overrides.id ?? `act-${Math.random().toString(36).slice(2)}`,
|
||||
activity_type: overrides.activityType ?? "agent_log",
|
||||
source_id: null,
|
||||
target_id: null,
|
||||
method: null,
|
||||
summary: overrides.summary ?? "live-pushed",
|
||||
status: "ok",
|
||||
created_at: new Date(realNow - 5_000).toISOString(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
it("WS push for matching workspace prepends to the list with NO HTTP call", async () => {
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText(/0 activities|no activity/i)).toBeTruthy();
|
||||
});
|
||||
mockGet.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitActivity({ summary: "live-row-from-bus" });
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText(/live-row-from-bus/)).toBeTruthy();
|
||||
});
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push for a different workspace is ignored", async () => {
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
await waitFor(() => screen.getByText(/no activity/i));
|
||||
|
||||
await act(async () => {
|
||||
emitActivity({
|
||||
workspaceId: "ws-other",
|
||||
summary: "should-not-render-other-ws",
|
||||
});
|
||||
});
|
||||
|
||||
expect(screen.queryByText(/should-not-render-other-ws/)).toBeNull();
|
||||
});
|
||||
|
||||
it("WS push respects the active filter — non-matching activity_type is ignored", async () => {
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
await waitFor(() => screen.getByText(/no activity/i));
|
||||
|
||||
// Apply "Tasks" filter.
|
||||
clickButton(/tasks/i);
|
||||
await waitFor(() => {
|
||||
expect(
|
||||
screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"),
|
||||
).toBe("true");
|
||||
});
|
||||
|
||||
// Push an a2a_send (does NOT match task_update filter).
|
||||
await act(async () => {
|
||||
emitActivity({
|
||||
activityType: "a2a_send",
|
||||
summary: "should-not-render-filter-mismatch",
|
||||
});
|
||||
});
|
||||
|
||||
expect(
|
||||
screen.queryByText(/should-not-render-filter-mismatch/),
|
||||
).toBeNull();
|
||||
});
|
||||
|
||||
it("WS push respects the active filter — matching activity_type is rendered", async () => {
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
await waitFor(() => screen.getByText(/no activity/i));
|
||||
|
||||
clickButton(/tasks/i);
|
||||
await waitFor(() => {
|
||||
expect(
|
||||
screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"),
|
||||
).toBe("true");
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
emitActivity({
|
||||
activityType: "task_update",
|
||||
summary: "task-filter-match",
|
||||
});
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText(/task-filter-match/)).toBeTruthy();
|
||||
});
|
||||
});
|
||||
|
||||
it("WS push while autoRefresh is paused is ignored", async () => {
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
await waitFor(() => screen.getByText(/no activity/i));
|
||||
|
||||
// Toggle Live → Paused.
|
||||
clickButton(/live/i);
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText(/Paused/)).toBeTruthy();
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
emitActivity({ summary: "should-not-render-paused" });
|
||||
});
|
||||
|
||||
expect(screen.queryByText(/should-not-render-paused/)).toBeNull();
|
||||
});
|
||||
|
||||
it("WS push for a row already in the list is deduped (no double-render)", async () => {
|
||||
// Bootstrap with one row — same id as the WS push to trigger dedup.
|
||||
mockGet.mockResolvedValueOnce([
|
||||
makeEntry({ id: "shared-id", summary: "bootstrap-summary" }),
|
||||
]);
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
await waitFor(() => {
|
||||
expect(screen.getByText(/bootstrap-summary/)).toBeTruthy();
|
||||
});
|
||||
mockGet.mockClear();
|
||||
|
||||
// Push a row with the SAME id but a different summary — must not
|
||||
// render the new summary; original row stays.
|
||||
await act(async () => {
|
||||
emitActivity({
|
||||
id: "shared-id",
|
||||
summary: "should-not-replace-existing",
|
||||
});
|
||||
});
|
||||
|
||||
expect(screen.queryByText(/should-not-replace-existing/)).toBeNull();
|
||||
// Also verify count didn't grow.
|
||||
expect(screen.getByText(/1 activities/)).toBeTruthy();
|
||||
});
|
||||
|
||||
it("does NOT poll on a 5s interval after mount (post-#61)", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
render(<ActivityTab workspaceId="ws-1" />);
|
||||
// Drain the mount-time bootstrap promise.
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
});
|
||||
const callsAfterBootstrap = mockGet.mock.calls.length;
|
||||
expect(callsAfterBootstrap).toBeGreaterThanOrEqual(1);
|
||||
|
||||
// Pre-#61: a 30s clock advance fires 6 more polls. Post-#61: 0.
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(30_000);
|
||||
});
|
||||
expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// ── Suite 7: Activity count ───────────────────────────────────────────────────
|
||||
|
||||
describe("ActivityTab — activity count", () => {
|
||||
|
||||
@ -1,18 +1,28 @@
|
||||
// @vitest-environment jsdom
|
||||
/**
|
||||
* CommunicationOverlay tests — pin the rate-limit fix shipped 2026-05-04.
|
||||
* CommunicationOverlay tests — pin both the 2026-05-04 fan-out cap fix
|
||||
* AND the 2026-05-07 polling → ACTIVITY_LOGGED-subscriber refactor
|
||||
* (issue #61 stage 1).
|
||||
*
|
||||
* The overlay polls /workspaces/:id/activity?limit=5 for each online
|
||||
* workspace. Pre-fix it (a) polled regardless of visibility and (b)
|
||||
* fanned out to 6 workspaces every 10s. With 8+ workspaces a user
|
||||
* triggered sustained 429s (server-side rate limit is 600 req/min/IP).
|
||||
* The overlay used to poll /workspaces/:id/activity?limit=5 on a 30s
|
||||
* interval per online workspace (capped at 3). Post-#61: it bootstraps
|
||||
* once on mount via the same HTTP path (cap of 3 retained), then
|
||||
* subscribes to ACTIVITY_LOGGED via the global socket bus for live
|
||||
* updates. No interval poll.
|
||||
*
|
||||
* These tests pin:
|
||||
* 1. Fan-out cap of 3 — even with 6 online nodes, only 3 fetches
|
||||
* 2. Visibility gate — when collapsed, no polling
|
||||
* 1. Bootstrap fan-out cap of 3 — even with 6 online nodes, only 3
|
||||
* HTTP fetches on mount.
|
||||
* 2. Visibility gate — when collapsed, no HTTP fetches; re-open
|
||||
* re-bootstraps.
|
||||
* 3. NO interval polling — advancing the clock past 30s does not fire
|
||||
* additional HTTP calls.
|
||||
* 4. WS push extends the rendered list without firing any HTTP call.
|
||||
* 5. WS push for an offline workspace is ignored.
|
||||
* 6. WS push for a non-comm activity_type is ignored.
|
||||
*
|
||||
* If a future refactor pushes either dial back up, CI fails before
|
||||
* the regression hits a paying tenant.
|
||||
* If a future refactor regresses any of these, CI fails before the
|
||||
* regression hits a paying tenant.
|
||||
*/
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import { render, cleanup, act, fireEvent } from "@testing-library/react";
|
||||
@ -23,7 +33,7 @@ vi.mock("@/lib/api", () => ({
|
||||
api: { get: vi.fn() },
|
||||
}));
|
||||
|
||||
// Six online nodes — enough to verify the cap of 3.
|
||||
// Six online nodes — enough to verify the bootstrap cap of 3.
|
||||
const mockStoreState = {
|
||||
selectedNodeId: null as string | null,
|
||||
nodes: [
|
||||
@ -56,6 +66,10 @@ vi.mock("@/lib/design-tokens", () => ({
|
||||
// ── Imports (after mocks) ─────────────────────────────────────────────────────
|
||||
|
||||
import { api } from "@/lib/api";
|
||||
import {
|
||||
emitSocketEvent,
|
||||
_resetSocketEventListenersForTests,
|
||||
} from "@/store/socket-events";
|
||||
import { CommunicationOverlay } from "../CommunicationOverlay";
|
||||
|
||||
const mockGet = vi.mocked(api.get);
|
||||
@ -66,30 +80,34 @@ beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
mockGet.mockReset();
|
||||
mockGet.mockResolvedValue([]);
|
||||
// Drop any subscribers the previous test left on the singleton bus —
|
||||
// each render adds one via useSocketEvent.
|
||||
_resetSocketEventListenersForTests();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
cleanup();
|
||||
vi.useRealTimers();
|
||||
_resetSocketEventListenersForTests();
|
||||
});
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
describe("CommunicationOverlay — fan-out cap", () => {
|
||||
it("polls at most 3 of 6 online workspaces (rate-limit floor)", async () => {
|
||||
describe("CommunicationOverlay — bootstrap fan-out cap", () => {
|
||||
it("bootstraps at most 3 of 6 online workspaces (rate-limit floor preserved post-#61)", async () => {
|
||||
await act(async () => {
|
||||
render(<CommunicationOverlay />);
|
||||
});
|
||||
// Mount fires the first poll synchronously (no interval tick yet).
|
||||
// Pre-fix: 6 calls. Post-fix: 3.
|
||||
// Mount fires the bootstrap synchronously — pre-#61 this was the
|
||||
// first poll cycle; post-#61 it's the only HTTP fetch (live updates
|
||||
// arrive via WS push). 6 nodes → 3 fetches.
|
||||
expect(mockGet).toHaveBeenCalledTimes(3);
|
||||
// Verify the calls are for the FIRST 3 online nodes (slice order).
|
||||
expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-1/activity?limit=5");
|
||||
expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-2/activity?limit=5");
|
||||
expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-3/activity?limit=5");
|
||||
});
|
||||
|
||||
it("never polls offline workspaces", async () => {
|
||||
it("never bootstraps offline workspaces", async () => {
|
||||
await act(async () => {
|
||||
render(<CommunicationOverlay />);
|
||||
});
|
||||
@ -99,40 +117,39 @@ describe("CommunicationOverlay — fan-out cap", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("CommunicationOverlay — cadence", () => {
|
||||
it("uses 30s interval cadence (was 10s pre-fix)", async () => {
|
||||
describe("CommunicationOverlay — no interval polling (post-#61)", () => {
|
||||
// The pre-#61 implementation re-fetched every 30s per workspace.
|
||||
// Post-#61 the only HTTP path is the bootstrap on mount + on
|
||||
// visibility-toggle. This test pins the absence of any interval
|
||||
// poll: a 60s clock advance must not produce a second round of
|
||||
// fetches.
|
||||
it("does NOT poll on a 30s interval after bootstrap", async () => {
|
||||
await act(async () => {
|
||||
render(<CommunicationOverlay />);
|
||||
});
|
||||
expect(mockGet).toHaveBeenCalledTimes(3); // initial mount poll
|
||||
expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap
|
||||
mockGet.mockClear();
|
||||
|
||||
// Advance 10s — pre-fix this would fire another poll. Post-fix: silent.
|
||||
// Advance 60s — well past any plausible cadence the prior version
|
||||
// could have used.
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(10_000);
|
||||
vi.advanceTimersByTime(60_000);
|
||||
});
|
||||
expect(mockGet).toHaveBeenCalledTimes(3);
|
||||
|
||||
// Advance to 30s — interval fires.
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(20_000);
|
||||
});
|
||||
expect(mockGet).toHaveBeenCalledTimes(6); // +3 from second tick
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("CommunicationOverlay — visibility gate", () => {
|
||||
// The visibility gate is the dial that drops collapsed-panel polling
|
||||
// to ZERO. The cadence test above can't catch its removal — if a
|
||||
// refactor dropped `if (!visible) return`, the cadence test would
|
||||
// still pass because the effect would still fire every 30s.
|
||||
// The visibility gate now does two things post-#61:
|
||||
// - while closed, the WS handler short-circuits (no setComms churn)
|
||||
// - re-opening triggers a fresh bootstrap so the list reflects
|
||||
// anything that happened while the panel was collapsed
|
||||
//
|
||||
// Direct probe: render with comms-returning mock so the panel
|
||||
// actually renders (close button only exists in the expanded panel,
|
||||
// not the collapsed button-state). Click close, advance the clock,
|
||||
// assert no further fetches.
|
||||
it("stops polling after the user collapses the panel", async () => {
|
||||
// Mock returns one a2a_send so comms.length > 0 → panel renders →
|
||||
// close button accessible.
|
||||
it("stops fetching while collapsed and re-bootstraps on re-open", async () => {
|
||||
mockGet.mockResolvedValue([
|
||||
{
|
||||
id: "act-1",
|
||||
@ -150,29 +167,202 @@ describe("CommunicationOverlay — visibility gate", () => {
|
||||
const { getByLabelText } = await act(async () => {
|
||||
return render(<CommunicationOverlay />);
|
||||
});
|
||||
// Drain pending microtasks (resolves the await in fetchComms) so
|
||||
// setComms lands and the panel renders. Don't advance time — that
|
||||
// would fire the next interval tick and pollute the assertion.
|
||||
// Drain pending microtasks (resolves the await in bootstrap) so
|
||||
// setComms lands and the panel renders. Don't advance time — it's
|
||||
// not load-bearing for the gate test, but matches the pattern used
|
||||
// pre-#61 for stability.
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
});
|
||||
// Initial mount polled 3 workspaces.
|
||||
expect(mockGet).toHaveBeenCalledTimes(3);
|
||||
expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap
|
||||
mockGet.mockClear();
|
||||
|
||||
// Click the close button. Synchronous getByLabelText avoids
|
||||
// findBy's internal setTimeout (deadlocks under useFakeTimers).
|
||||
// Click close. While closed, no fetches and no WS-driven updates.
|
||||
const closeBtn = getByLabelText("Close communications panel");
|
||||
await act(async () => {
|
||||
fireEvent.click(closeBtn);
|
||||
});
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(60_000);
|
||||
});
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
|
||||
// Re-open via the collapsed button. Must trigger a fresh bootstrap.
|
||||
const openBtn = getByLabelText("Show communications panel");
|
||||
await act(async () => {
|
||||
fireEvent.click(openBtn);
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
});
|
||||
expect(mockGet).toHaveBeenCalledTimes(3); // re-bootstrap on re-open
|
||||
});
|
||||
});
|
||||
|
||||
describe("CommunicationOverlay — WS subscription (#61 stage 1 core)", () => {
|
||||
// The load-bearing post-#61 behaviour. Every test in this block must
|
||||
// verify (a) the WS push DID update the rendered comms list, and
|
||||
// (b) NO additional HTTP call was fired — the whole point of the
|
||||
// refactor is to remove the polling-driven HTTP traffic.
|
||||
function emitActivityLogged(overrides: Partial<{
|
||||
workspaceId: string;
|
||||
payload: Record<string, unknown>;
|
||||
}> = {}) {
|
||||
emitSocketEvent({
|
||||
event: "ACTIVITY_LOGGED",
|
||||
workspace_id: overrides.workspaceId ?? "ws-1",
|
||||
timestamp: new Date().toISOString(),
|
||||
payload: {
|
||||
id: `act-${Math.random().toString(36).slice(2)}`,
|
||||
activity_type: "a2a_send",
|
||||
source_id: "ws-1",
|
||||
target_id: "ws-2",
|
||||
summary: "live push",
|
||||
status: "ok",
|
||||
duration_ms: 42,
|
||||
created_at: new Date().toISOString(),
|
||||
...overrides.payload,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
it("WS push for a comm activity_type extends the rendered list with NO additional HTTP call", async () => {
|
||||
const { container } = await act(async () => {
|
||||
return render(<CommunicationOverlay />);
|
||||
});
|
||||
expect(mockGet).toHaveBeenCalledTimes(3); // bootstrap
|
||||
mockGet.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitActivityLogged({ payload: { summary: "hello" } });
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
// Two pins:
|
||||
// 1. comms list reflects the live push (look for the summary text)
|
||||
// 2. zero HTTP fetches fired during the WS path
|
||||
expect(container.textContent).toContain("hello");
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push for an offline workspace is ignored", async () => {
|
||||
const { container } = await act(async () => {
|
||||
return render(<CommunicationOverlay />);
|
||||
});
|
||||
mockGet.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitActivityLogged({
|
||||
workspaceId: "ws-offline",
|
||||
payload: { source_id: "ws-offline", summary: "should-not-render" },
|
||||
});
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
expect(container.textContent).not.toContain("should-not-render");
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push for a non-comm activity_type is ignored (e.g. delegation)", async () => {
|
||||
const { container } = await act(async () => {
|
||||
return render(<CommunicationOverlay />);
|
||||
});
|
||||
mockGet.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitActivityLogged({
|
||||
payload: {
|
||||
activity_type: "delegation",
|
||||
summary: "should-not-render-delegation",
|
||||
},
|
||||
});
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
expect(container.textContent).not.toContain("should-not-render-delegation");
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("WS push while the panel is collapsed is ignored (no churn on hidden state)", async () => {
|
||||
// Bootstrap with one comm so the panel renders → close button
|
||||
// accessible. Then collapse, emit a WS push, re-open: the rendered
|
||||
// list must come from the re-bootstrap, NOT from the WS-push that
|
||||
// arrived during the closed state. Also: nothing visible while
|
||||
// closed (the collapsed button shows only the count, not summaries).
|
||||
mockGet.mockResolvedValue([
|
||||
{
|
||||
id: "act-bootstrap",
|
||||
workspace_id: "ws-1",
|
||||
activity_type: "a2a_send",
|
||||
source_id: "ws-1",
|
||||
target_id: "ws-2",
|
||||
summary: "bootstrap-summary",
|
||||
status: "ok",
|
||||
duration_ms: 1,
|
||||
created_at: new Date().toISOString(),
|
||||
},
|
||||
]);
|
||||
const { getByLabelText, container } = await act(async () => {
|
||||
return render(<CommunicationOverlay />);
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
// Collapse.
|
||||
const closeBtn = getByLabelText("Close communications panel");
|
||||
await act(async () => {
|
||||
fireEvent.click(closeBtn);
|
||||
});
|
||||
|
||||
// Advance well past the 30s cadence — gate should suppress the tick.
|
||||
// Bootstrap mock returns nothing on the re-open path so we can
|
||||
// distinguish "WS push leaked through the gate" from "re-bootstrap
|
||||
// refilled the list."
|
||||
mockGet.mockReset();
|
||||
mockGet.mockResolvedValue([]);
|
||||
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(60_000);
|
||||
emitActivityLogged({
|
||||
payload: { summary: "leaked-while-closed" },
|
||||
});
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
// Closed state: rendered DOM must not show any push-derived text.
|
||||
expect(container.textContent).not.toContain("leaked-while-closed");
|
||||
});
|
||||
|
||||
it("non-ACTIVITY_LOGGED events are ignored (e.g. WORKSPACE_OFFLINE)", async () => {
|
||||
const { container } = await act(async () => {
|
||||
return render(<CommunicationOverlay />);
|
||||
});
|
||||
mockGet.mockClear();
|
||||
|
||||
await act(async () => {
|
||||
emitSocketEvent({
|
||||
event: "WORKSPACE_OFFLINE",
|
||||
workspace_id: "ws-1",
|
||||
timestamp: new Date().toISOString(),
|
||||
payload: { summary: "should-not-render-event" },
|
||||
});
|
||||
});
|
||||
await act(async () => {
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
expect(container.textContent).not.toContain("should-not-render-event");
|
||||
expect(mockGet).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
"use client";
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { ConversationTraceModal } from "@/components/ConversationTraceModal";
|
||||
import { useSocketEvent } from "@/hooks/useSocketEvent";
|
||||
import { type ActivityEntry } from "@/types/activity";
|
||||
import { useWorkspaceName } from "@/hooks/useWorkspaceName";
|
||||
import { inferA2AErrorHint } from "./chat/a2aErrorHint";
|
||||
@ -48,6 +49,15 @@ export function ActivityTab({ workspaceId }: Props) {
|
||||
const [traceOpen, setTraceOpen] = useState(false);
|
||||
const resolveName = useWorkspaceName();
|
||||
|
||||
// Refs let the WS handler read the latest filter / autoRefresh
|
||||
// selection without re-subscribing on every state change. The bus
|
||||
// listener is registered exactly once per mount via useSocketEvent's
|
||||
// ref-internal pattern; subscriber-side filtering reads from these.
|
||||
const filterRef = useRef(filter);
|
||||
filterRef.current = filter;
|
||||
const autoRefreshRef = useRef(autoRefresh);
|
||||
autoRefreshRef.current = autoRefresh;
|
||||
|
||||
const loadActivities = useCallback(async () => {
|
||||
try {
|
||||
const typeParam = filter !== "all" ? `?type=${filter}` : "";
|
||||
@ -66,11 +76,58 @@ export function ActivityTab({ workspaceId }: Props) {
|
||||
loadActivities();
|
||||
}, [loadActivities]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!autoRefresh) return;
|
||||
const interval = setInterval(loadActivities, 5000);
|
||||
return () => clearInterval(interval);
|
||||
}, [loadActivities, autoRefresh]);
|
||||
// Live-update path (issue #61 stage 3, replaces the 5s setInterval).
|
||||
// ACTIVITY_LOGGED events from this workspace prepend to the rendered
|
||||
// list — dedup by id so a server-side update + a poll reply don't
|
||||
// double-render the same row.
|
||||
//
|
||||
// Honours the user's autoRefresh toggle: when paused, live updates
|
||||
// are dropped until the user re-enables Live (or hits Refresh, which
|
||||
// re-bootstraps via loadActivities).
|
||||
//
|
||||
// Filter awareness: matches the server-side `?type=<filter>`
|
||||
// semantics so the panel doesn't show rows the user excluded.
|
||||
useSocketEvent((msg) => {
|
||||
if (!autoRefreshRef.current) return;
|
||||
if (msg.event !== "ACTIVITY_LOGGED") return;
|
||||
if (msg.workspace_id !== workspaceId) return;
|
||||
|
||||
const p = (msg.payload || {}) as Record<string, unknown>;
|
||||
const activityType = (p.activity_type as string) || "";
|
||||
|
||||
const f = filterRef.current;
|
||||
if (f !== "all" && activityType !== f) return;
|
||||
|
||||
const entry: ActivityEntry = {
|
||||
id:
|
||||
(p.id as string) ||
|
||||
`ws-push-${msg.timestamp || Date.now()}-${msg.workspace_id}`,
|
||||
workspace_id: msg.workspace_id,
|
||||
activity_type: activityType,
|
||||
source_id: (p.source_id as string | null) ?? null,
|
||||
target_id: (p.target_id as string | null) ?? null,
|
||||
method: (p.method as string | null) ?? null,
|
||||
summary: (p.summary as string | null) ?? null,
|
||||
request_body: (p.request_body as Record<string, unknown> | null) ?? null,
|
||||
response_body:
|
||||
(p.response_body as Record<string, unknown> | null) ?? null,
|
||||
duration_ms: (p.duration_ms as number | null) ?? null,
|
||||
status: (p.status as string) || "ok",
|
||||
error_detail: (p.error_detail as string | null) ?? null,
|
||||
created_at:
|
||||
(p.created_at as string) ||
|
||||
msg.timestamp ||
|
||||
new Date().toISOString(),
|
||||
};
|
||||
|
||||
setActivities((prev) => {
|
||||
// Dedup by id — a row that arrived via the bootstrap fetch and
|
||||
// also fires ACTIVITY_LOGGED from a delayed server-side hook
|
||||
// must render exactly once.
|
||||
if (prev.some((e) => e.id === entry.id)) return prev;
|
||||
return [entry, ...prev];
|
||||
});
|
||||
});
|
||||
|
||||
return (
|
||||
<div className="flex flex-col h-full">
|
||||
|
||||
147
docs/engineering/ratelimit-observability.md
Normal file
147
docs/engineering/ratelimit-observability.md
Normal file
@ -0,0 +1,147 @@
|
||||
# Rate-limit observability runbook
|
||||
|
||||
> Companion to issue #64 ("RATE_LIMIT default re-tune analysis"). After
|
||||
> #60 deployed the per-tenant `keyFor` keying, the right RATE_LIMIT
|
||||
> default became data-dependent. This runbook documents the metrics +
|
||||
> queries an operator should run to confirm whether the current 600
|
||||
> req/min/key default is correct, too tight, or too loose.
|
||||
|
||||
## What's already exposed
|
||||
|
||||
The workspace-server's existing Prometheus middleware
|
||||
(`workspace-server/internal/metrics/metrics.go`) tracks every request
|
||||
on every path:
|
||||
|
||||
```
|
||||
molecule_http_requests_total{method, path, status} counter
|
||||
molecule_http_request_duration_seconds_total{method,path,status} counter
|
||||
```
|
||||
|
||||
Path is the matched route pattern (`/workspaces/:id/activity` etc), so
|
||||
high-cardinality workspace UUIDs do not explode the label space.
|
||||
|
||||
The rate limiter middleware (#60, `workspace-server/internal/middleware/ratelimit.go`)
|
||||
also stamps every response with `X-RateLimit-Limit`, `X-RateLimit-Remaining`,
|
||||
and `X-RateLimit-Reset`. Operators with browser-side or proxy-side
|
||||
header capture can read per-request bucket state directly.
|
||||
|
||||
No new instrumentation is needed for #64's acceptance criteria. The
|
||||
metric surface is sufficient — this runbook just collects the queries.
|
||||
|
||||
## Queries to run after #60 deploys
|
||||
|
||||
### 1. Is the bucket actually firing 429s?
|
||||
|
||||
```promql
|
||||
sum(rate(molecule_http_requests_total{status="429"}[5m]))
|
||||
```
|
||||
|
||||
If this is zero on a given tenant, the bucket isn't being hit. If it's
|
||||
sustained > 1/min, dig in.
|
||||
|
||||
### 2. Which routes attract 429s?
|
||||
|
||||
```promql
|
||||
topk(
|
||||
10,
|
||||
sum by (path) (
|
||||
rate(molecule_http_requests_total{status="429"}[5m])
|
||||
)
|
||||
)
|
||||
```
|
||||
|
||||
Expected shape post-#60:
|
||||
- `/workspaces/:id/activity` should be near zero — the canvas no longer
|
||||
polls it on a 30s/60s/5s cadence (PRs #69 / #71 / #76).
|
||||
- Probe / health / heartbeat paths should be ~0 (those routes have a
|
||||
separate IP-fallback bucket).
|
||||
|
||||
If `/workspaces/:id/activity` 429s persist post-PRs-69/71/76 deploy, the
|
||||
canvas isn't running the WS-subscriber path — investigate WS health
|
||||
on that tenant.
|
||||
|
||||
### 3. Per-bucket-key inference (no direct exposure today)
|
||||
|
||||
The bucket map itself is in-memory only; we deliberately do **not**
|
||||
expose `org:<uuid>` ↔ remaining-tokens because that map can include
|
||||
SHA-256 hashes of bearer tokens. A tenant that wants per-key visibility
|
||||
should rely on response headers (`X-RateLimit-Remaining` on every
|
||||
response from a given session is the bucket's view of that session).
|
||||
|
||||
If you genuinely need server-side per-bucket counts for triage,
|
||||
file a follow-up — the proper shape is a `/internal/ratelimit-stats`
|
||||
endpoint that emits **counts per key prefix only** (e.g. `org:`, `tok:`,
|
||||
`ip:`), never the key payloads. Don't roll that ad-hoc; it's a security
|
||||
review surface.
|
||||
|
||||
## Decision tree for the re-tune
|
||||
|
||||
After 14 days of production traffic on a tenant, look at the queries
|
||||
above and walk this tree:
|
||||
|
||||
```
|
||||
Q1: Is the 429 rate sustained > 0.1/sec on any tenant?
|
||||
├─ NO → The 600 default has comfortable headroom. Either keep it,
|
||||
│ or lower it carefully (300) ONLY if you have a documented
|
||||
│ reason (e.g. a misbehaving client we want to throttle harder).
|
||||
│ Default to "no change" — see #64 for the math.
|
||||
└─ YES → Q2.
|
||||
|
||||
Q2: Is the 429 rate concentrated on ONE tenant or spread across many?
|
||||
├─ ONE tenant → Operator override: set RATE_LIMIT=1200 or 1800 on that
|
||||
│ tenant's box. Document in the tenant's ops note. The
|
||||
│ default does not need to change.
|
||||
└─ MANY tenants → Q3.
|
||||
|
||||
Q3: Are the 429s on a route that polls (e.g. /activity / /peers)?
|
||||
├─ YES → Confirm PRs #69, #71, #76 have actually deployed to those
|
||||
│ tenants. If they have and 429s persist, the canvas may have
|
||||
│ a regression — do not raise RATE_LIMIT. File a canvas issue.
|
||||
└─ NO → 429s on mutating routes mean genuine load. Raise the default
|
||||
to 1200 in `workspace-server/internal/router/router.go:54`.
|
||||
Same PR should attach: the metric chart, the time window,
|
||||
and a paragraph explaining what changed in our traffic shape.
|
||||
```
|
||||
|
||||
## Alert rule template (drop-in for Prometheus)
|
||||
|
||||
```yaml
|
||||
# Sustained 429s — file is the SLO trip-wire. If this fires, walk the
|
||||
# decision tree above. NB: the issue#64 acceptance criterion is "two
|
||||
# weeks of metrics"; this alert is the inverse — it tells you something
|
||||
# changed before the two weeks are up.
|
||||
groups:
|
||||
- name: workspace-server-ratelimit
|
||||
rules:
|
||||
- alert: WorkspaceServerRateLimit429Sustained
|
||||
expr: |
|
||||
sum by (instance) (
|
||||
rate(molecule_http_requests_total{status="429"}[10m])
|
||||
) > 0.1
|
||||
for: 30m
|
||||
labels:
|
||||
severity: warning
|
||||
owner: workspace-server
|
||||
annotations:
|
||||
summary: "{{ $labels.instance }} sustained 429s — see ratelimit-observability runbook"
|
||||
runbook: "https://git.moleculesai.app/molecule-ai/molecule-core/blob/main/docs/engineering/ratelimit-observability.md"
|
||||
```
|
||||
|
||||
Threshold rationale: 0.1 req/s = 6/min sustained over 10min. Below
|
||||
that, a 429 is almost certainly a transient burst that the canvas's
|
||||
retry-once handler at `canvas/src/lib/api.ts:55` already absorbs. The
|
||||
30m `for:` keeps the alert from chattering on a brief blip.
|
||||
|
||||
## Companion probe script
|
||||
|
||||
For one-off triage when an operator can reproduce the problem in their
|
||||
own browser, `scripts/edge-429-probe.sh` (#62) reproduces a canvas-
|
||||
sized burst against a tenant subdomain and dumps each 429's response
|
||||
shape so the operator can distinguish workspace-server bucket overflow
|
||||
from CF/Vercel edge rate-limiting without dashboard access.
|
||||
|
||||
```sh
|
||||
./scripts/edge-429-probe.sh hongming.moleculesai.app --burst 80 --out /tmp/edge.txt
|
||||
```
|
||||
|
||||
The script's report header explains how to read the output.
|
||||
155
scripts/edge-429-probe.sh
Executable file
155
scripts/edge-429-probe.sh
Executable file
@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env bash
|
||||
# edge-429-probe.sh — capture 429 origin (workspace-server vs CF/Vercel edge)
|
||||
# during a simulated canvas-burst against a tenant subdomain.
|
||||
#
|
||||
# Issue molecule-core#62. The post-#60 verification step asks an
|
||||
# operator with CF/Vercel dashboard access to confirm whether the
|
||||
# layout-chunk 429s observed in DevTools were:
|
||||
# (a) workspace-server bucket overflow (closes once #60 deploys), or
|
||||
# (b) actual edge-layer rate-limiting (CF or Vercel).
|
||||
#
|
||||
# This script doesn't need dashboard access. It reproduces the burst
|
||||
# pattern locally and dumps every 429's response shape so the operator
|
||||
# can distinguish (a) from (b) by inspection: workspace-server emits a
|
||||
# JSON body, CF emits HTML, Vercel emits a different HTML. Headers tell
|
||||
# the same story (cf-ray vs x-vercel-*).
|
||||
#
|
||||
# Usage:
|
||||
# ./scripts/edge-429-probe.sh <tenant-host> [--burst N] [--waves N] [--pause SECS] [--out file]
|
||||
#
|
||||
# Example:
|
||||
# ./scripts/edge-429-probe.sh hongming.moleculesai.app --burst 80 --out /tmp/edge.txt
|
||||
#
|
||||
# The script is read-only against the target — it only issues GETs to
|
||||
# public-by-design endpoints. No mutating requests, no credential use.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
# ── Help / usage handling first, before positional capture ────────────────────
|
||||
case "${1:-}" in
|
||||
-h|--help|"")
|
||||
sed -n '/^# edge-429-probe.sh/,/^$/p' "$0" | sed 's/^# \{0,1\}//'
|
||||
exit 0
|
||||
;;
|
||||
esac
|
||||
|
||||
HOST="$1"; shift
|
||||
BURST=80
|
||||
WAVES=3
|
||||
WAVE_PAUSE=2
|
||||
OUT=""
|
||||
|
||||
while [ "${1:-}" != "" ]; do
|
||||
case "$1" in
|
||||
--burst) BURST="$2"; shift 2 ;;
|
||||
--waves) WAVES="$2"; shift 2 ;;
|
||||
--pause) WAVE_PAUSE="$2"; shift 2 ;;
|
||||
--out) OUT="$2"; shift 2 ;;
|
||||
-h|--help)
|
||||
sed -n '/^# edge-429-probe.sh/,/^$/p' "$0" | sed 's/^# \{0,1\}//'
|
||||
exit 0
|
||||
;;
|
||||
*) echo "unknown arg: $1" >&2; exit 2 ;;
|
||||
esac
|
||||
done
|
||||
|
||||
# ── Endpoint discovery ────────────────────────────────────────────────────────
|
||||
echo "→ Discovering a layout-chunk URL from canvas root..." >&2
|
||||
ROOT_BODY=$(curl -fsSL --max-time 10 "https://${HOST}/" 2>/dev/null || true)
|
||||
LAYOUT_PATH=$(echo "$ROOT_BODY" \
|
||||
| grep -oE '/_next/static/chunks/layout-[A-Za-z0-9_-]+\.js' \
|
||||
| head -1 || true)
|
||||
if [ -z "$LAYOUT_PATH" ]; then
|
||||
LAYOUT_PATH="/_next/static/chunks/layout-probe-not-found.js"
|
||||
echo " (no layout chunk discovered — using sentinel path; 404 on this is expected)" >&2
|
||||
else
|
||||
echo " layout chunk: $LAYOUT_PATH" >&2
|
||||
fi
|
||||
|
||||
# Probe URL: a generic activity endpoint. The rate-limiter middleware
|
||||
# runs BEFORE workspace-id validation, so unauth/invalid-id requests
|
||||
# still hit the bucket.
|
||||
ACTIVITY_PATH="/workspaces/00000000-0000-0000-0000-000000000000/activity?probe=edge-429"
|
||||
|
||||
# ── Fire one curl, write a single-line JSON-ish status record to stdout ──────
|
||||
# Inlined into xargs as a heredoc-style command rather than a function so
|
||||
# the function-export pitfalls (some shells lose `export -f` across xargs)
|
||||
# don't apply. Each output line is a parseable record; failed curls emit
|
||||
# a curl_err record so request volume is preserved.
|
||||
TMP_RESULTS="$(mktemp -t edge-429-probe.XXXXXX)"
|
||||
trap 'rm -f "$TMP_RESULTS"' EXIT
|
||||
|
||||
run_burst() {
|
||||
# $1 = path; $2 = label; $3 = wave_id
|
||||
local path="$1" label="$2" wave="$3"
|
||||
local i
|
||||
for i in $(seq 1 "$BURST"); do
|
||||
{
|
||||
out=$(curl -sS --max-time 10 -o /dev/null \
|
||||
-w 'status=%{http_code} size=%{size_download} time=%{time_total} server=%{header.server} cf_ray=%{header.cf-ray} x_vercel=%{header.x-vercel-id} retry_after=%{header.retry-after} content_type=%{header.content-type} x_ratelimit_limit=%{header.x-ratelimit-limit} x_ratelimit_remaining=%{header.x-ratelimit-remaining} x_ratelimit_reset=%{header.x-ratelimit-reset}\n' \
|
||||
"https://${HOST}${path}" 2>/dev/null) || out="status=curl_err"
|
||||
printf 'label=%s-%s-%s %s\n' "$label" "$wave" "$i" "$out" >> "$TMP_RESULTS"
|
||||
} &
|
||||
done
|
||||
wait
|
||||
}
|
||||
|
||||
emit() {
|
||||
if [ -n "$OUT" ]; then
|
||||
printf '%s\n' "$*" >> "$OUT"
|
||||
else
|
||||
printf '%s\n' "$*"
|
||||
fi
|
||||
}
|
||||
|
||||
if [ -n "$OUT" ]; then : > "$OUT"; fi
|
||||
|
||||
emit "# edge-429-probe report"
|
||||
emit "# host=$HOST burst=$BURST waves=$WAVES pause=${WAVE_PAUSE}s"
|
||||
emit "# layout_path=$LAYOUT_PATH"
|
||||
emit "# activity_path=$ACTIVITY_PATH"
|
||||
emit "# generated=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
|
||||
emit ""
|
||||
|
||||
for wave in $(seq 1 "$WAVES"); do
|
||||
emit "## wave $wave"
|
||||
: > "$TMP_RESULTS"
|
||||
run_burst "$LAYOUT_PATH" "layout" "$wave"
|
||||
run_burst "$ACTIVITY_PATH" "activity" "$wave"
|
||||
while read -r line; do
|
||||
emit " $line"
|
||||
done < "$TMP_RESULTS"
|
||||
if [ "$wave" -lt "$WAVES" ]; then
|
||||
sleep "$WAVE_PAUSE"
|
||||
fi
|
||||
done
|
||||
|
||||
emit ""
|
||||
emit "## summary — how to read the report"
|
||||
emit "# status=429 + content_type starts with application/json + x_ratelimit_limit set"
|
||||
emit "# => workspace-server bucket overflow. Closes when #60 deploys."
|
||||
emit "# status=429 + cf_ray set + content_type=text/html"
|
||||
emit "# => Cloudflare WAF / rate-limit. Audit dashboard rules per #62."
|
||||
emit "# status=429 + x_vercel set + content_type=text/html"
|
||||
emit "# => Vercel edge / Bot Fight Mode. Audit Vercel project per #62."
|
||||
emit "# status=429 with no server/cf_ray/x_vercel"
|
||||
emit "# => corporate proxy or VPN. Not actionable in this repo."
|
||||
|
||||
if [ -n "$OUT" ]; then
|
||||
echo "→ Report written to $OUT" >&2
|
||||
# Match only data lines (begin with two-space indent + "label="),
|
||||
# not the summary's reference text which also mentions "status=429".
|
||||
# grep -c outputs "0" + exits 1 when zero matches; `|| true` masks
|
||||
# the exit status so set -e doesn't trip without losing the count.
|
||||
total=$(grep -c '^ label=' "$OUT" 2>/dev/null || true)
|
||||
total429=$(grep -c '^ label=.*status=429' "$OUT" 2>/dev/null || true)
|
||||
total=${total:-0}
|
||||
total429=${total429:-0}
|
||||
echo "→ Totals: ${total429} of ${total} requests returned 429" >&2
|
||||
if [ "${total429}" -gt 0 ]; then
|
||||
echo "→ Per-label 429 counts:" >&2
|
||||
grep '^ label=.*status=429' "$OUT" \
|
||||
| sed -E 's/^ label=([^-]+).*/ \1/' \
|
||||
| sort | uniq -c >&2
|
||||
fi
|
||||
fi
|
||||
@ -5,17 +5,19 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// RateLimiter implements a simple token bucket rate limiter per IP.
|
||||
// RateLimiter implements a token bucket rate limiter keyed by tenant
|
||||
// identity (org id, then bearer token, then client IP — see keyFor).
|
||||
type RateLimiter struct {
|
||||
mu sync.Mutex
|
||||
buckets map[string]*bucket
|
||||
rate int // tokens per interval
|
||||
mu sync.Mutex
|
||||
buckets map[string]*bucket
|
||||
rate int // tokens per interval
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
@ -42,9 +44,9 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
|
||||
case <-ticker.C:
|
||||
rl.mu.Lock()
|
||||
cutoff := time.Now().Add(-10 * time.Minute)
|
||||
for ip, b := range rl.buckets {
|
||||
for k, b := range rl.buckets {
|
||||
if b.lastReset.Before(cutoff) {
|
||||
delete(rl.buckets, ip)
|
||||
delete(rl.buckets, k)
|
||||
}
|
||||
}
|
||||
rl.mu.Unlock()
|
||||
@ -54,29 +56,73 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
|
||||
return rl
|
||||
}
|
||||
|
||||
// Middleware returns a Gin middleware that rate limits by client IP.
|
||||
// keyFor returns the bucket identifier for this request. Priority:
|
||||
//
|
||||
// 1. X-Molecule-Org-Id header — when present (CP-routed SaaS traffic),
|
||||
// isolates tenants from each other regardless of the upstream proxy IP
|
||||
// they all share.
|
||||
// 2. SHA-256 of Authorization Bearer token — when present (per-workspace
|
||||
// bearer, ADMIN_TOKEN, org-scoped API token). On a per-tenant Caddy
|
||||
// box where the org-id header isn't attached, this still distinguishes
|
||||
// distinct user sessions on the same egress IP.
|
||||
// 3. ClientIP() — anonymous probes, /health scrapes, registry boot
|
||||
// signals (when SetTrustedProxies(nil) is in effect, this is the
|
||||
// direct TCP RemoteAddr — fine for the probe surface, not fine as a
|
||||
// primary key behind a proxy, hence the priority order above).
|
||||
//
|
||||
// Mixing these namespaces is fine because they never collide: org ids
|
||||
// are UUIDs ("org:..."), token hashes are 64-char hex ("tok:..."), IPs
|
||||
// contain dots/colons ("ip:...").
|
||||
//
|
||||
// Security note on X-Molecule-Org-Id spoofing: the rate limiter runs
|
||||
// BEFORE TenantGuard, so the org-id value here is unvalidated. A caller
|
||||
// reaching workspace-server directly could spoof the header to drain
|
||||
// another org's bucket. In production this surface is closed by the
|
||||
// CP/Caddy front: tenant SGs reject :8080 from the public internet, and
|
||||
// CP rewrites the header to the verified org. If a future deployment
|
||||
// exposes :8080 directly, validate the org-id (e.g. against
|
||||
// MOLECULE_ORG_ID) before keying on it, or move this middleware after
|
||||
// TenantGuard. The token-hash and IP fallbacks are unspoofable.
|
||||
//
|
||||
// Issue #59 — replaces the previous IP-only keying that silently
|
||||
// collapsed all canvas traffic into one bucket once #179 disabled
|
||||
// proxy-header trust. See the issue for the deployment-shape analysis.
|
||||
func (rl *RateLimiter) keyFor(c *gin.Context) string {
|
||||
if orgID := strings.TrimSpace(c.GetHeader("X-Molecule-Org-Id")); orgID != "" {
|
||||
return "org:" + orgID
|
||||
}
|
||||
if tok := bearerFromHeader(c.GetHeader("Authorization")); tok != "" {
|
||||
return "tok:" + tokenKey(tok)
|
||||
}
|
||||
return "ip:" + c.ClientIP()
|
||||
}
|
||||
|
||||
// Middleware returns a Gin middleware that rate limits per caller. The
|
||||
// caller-key derivation lives in keyFor — see that function's doc for
|
||||
// the priority list and rationale.
|
||||
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
// Tier-1b dev-mode hatch — same gate as AdminAuth / WorkspaceAuth /
|
||||
// discovery. On a local single-user Docker setup the 600-req/min
|
||||
// bucket fills fast: a 15-workspace canvas + activity polling +
|
||||
// approvals polling + A2A overlay + initial hydration all share
|
||||
// one IP bucket, so a minute of active use can trip 429 and blank
|
||||
// the page. Gated by MOLECULE_ENV=development + empty ADMIN_TOKEN
|
||||
// so SaaS production keeps the bucket.
|
||||
// approvals polling + A2A overlay + initial hydration all land in
|
||||
// one bucket (whichever keyFor returns — typically the dev user's
|
||||
// IP or shared admin token), so a minute of active use can trip
|
||||
// 429 and blank the page. Gated by MOLECULE_ENV=development +
|
||||
// empty ADMIN_TOKEN so SaaS production keeps the bucket.
|
||||
if isDevModeFailOpen() {
|
||||
c.Header("X-RateLimit-Limit", "unlimited")
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
ip := c.ClientIP()
|
||||
key := rl.keyFor(c)
|
||||
|
||||
rl.mu.Lock()
|
||||
b, exists := rl.buckets[ip]
|
||||
b, exists := rl.buckets[key]
|
||||
if !exists {
|
||||
b = &bucket{tokens: rl.rate, lastReset: time.Now()}
|
||||
rl.buckets[ip] = b
|
||||
rl.buckets[key] = b
|
||||
}
|
||||
|
||||
// Reset tokens if interval has passed
|
||||
|
||||
303
workspace-server/internal/middleware/ratelimit_keyfor_test.go
Normal file
303
workspace-server/internal/middleware/ratelimit_keyfor_test.go
Normal file
@ -0,0 +1,303 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"go/ast"
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// newTestLimiterForKeyFor — same shape as newTestLimiter in ratelimit_test.go
|
||||
// but exposes the *gin.Engine and lets the caller inject headers per-request.
|
||||
func newTestLimiterForKeyFor(t *testing.T, rate int) *gin.Engine {
|
||||
t.Helper()
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
rl := NewRateLimiter(rate, 5*time.Second, ctx)
|
||||
r := gin.New()
|
||||
if err := r.SetTrustedProxies(nil); err != nil {
|
||||
t.Fatalf("SetTrustedProxies: %v", err)
|
||||
}
|
||||
r.Use(rl.Middleware())
|
||||
r.GET("/x", func(c *gin.Context) { c.String(http.StatusOK, "ok") })
|
||||
return r
|
||||
}
|
||||
|
||||
// TestKeyFor_OrgIdHeaderTrumpsBearerAndIP — when X-Molecule-Org-Id is set
|
||||
// the bucket is keyed on it regardless of bearer token or IP. This is the
|
||||
// load-bearing case for the production SaaS plane: every tenant routed
|
||||
// through the same upstream proxy IP gets its own bucket because the
|
||||
// CP attaches the org-id header.
|
||||
func TestKeyFor_OrgIdHeaderTrumpsBearerAndIP(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
rl := NewRateLimiter(2, 5*time.Second, ctx)
|
||||
|
||||
c, _ := gin.CreateTestContext(httptest.NewRecorder())
|
||||
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
c.Request.RemoteAddr = "10.0.0.1:1234"
|
||||
c.Request.Header.Set("X-Molecule-Org-Id", "org-aaa")
|
||||
c.Request.Header.Set("Authorization", "Bearer ignored-token-value")
|
||||
|
||||
got := rl.keyFor(c)
|
||||
if got != "org:org-aaa" {
|
||||
t.Errorf("keyFor with org-id header: got %q, want %q", got, "org:org-aaa")
|
||||
}
|
||||
}
|
||||
|
||||
// TestKeyFor_BearerTokenWhenNoOrgId — the per-tenant Caddy box path:
|
||||
// no org-id header (canvas same-origin), but Authorization Bearer is
|
||||
// always set by WorkspaceAuth-protected routes. Bucket keyed on the
|
||||
// SHA-256 hex of the token so distinct sessions on the same egress IP
|
||||
// get distinct buckets — and so the in-memory map can never become a
|
||||
// token dump if the process is inspected.
|
||||
func TestKeyFor_BearerTokenWhenNoOrgId(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
rl := NewRateLimiter(2, 5*time.Second, ctx)
|
||||
|
||||
c, _ := gin.CreateTestContext(httptest.NewRecorder())
|
||||
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
c.Request.RemoteAddr = "10.0.0.1:1234"
|
||||
c.Request.Header.Set("Authorization", "Bearer secret-token-abc")
|
||||
|
||||
got := rl.keyFor(c)
|
||||
expectedHash := fmt.Sprintf("%x", sha256.Sum256([]byte("secret-token-abc")))
|
||||
if got != "tok:"+expectedHash {
|
||||
t.Errorf("keyFor with bearer-only: got %q, want %q", got, "tok:"+expectedHash)
|
||||
}
|
||||
// Critical security pin: raw token must never appear in the key.
|
||||
if strings.Contains(got, "secret-token-abc") {
|
||||
t.Errorf("keyFor leaked raw bearer token in bucket key: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestKeyFor_IPFallbackWhenNoOrgIdNoBearer — anonymous probes (no auth,
|
||||
// no tenant header) fall through to ClientIP keying. This is the only
|
||||
// path that depended on the pre-#179 trust-XFF behaviour and is fine
|
||||
// to keep IP-keyed because the surface is just /health, /buildinfo,
|
||||
// and the registry-boot endpoints.
|
||||
func TestKeyFor_IPFallbackWhenNoOrgIdNoBearer(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
rl := NewRateLimiter(2, 5*time.Second, ctx)
|
||||
|
||||
c, _ := gin.CreateTestContext(httptest.NewRecorder())
|
||||
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
c.Request.RemoteAddr = "203.0.113.1:1234"
|
||||
|
||||
got := rl.keyFor(c)
|
||||
// gin.ClientIP() strips the port — we just need to confirm the prefix
|
||||
// and that the IP appears.
|
||||
if !strings.HasPrefix(got, "ip:") {
|
||||
t.Errorf("keyFor without auth/org headers: got %q, want prefix %q", got, "ip:")
|
||||
}
|
||||
if !strings.Contains(got, "203.0.113.1") {
|
||||
t.Errorf("keyFor IP fallback: got %q, want to contain %q", got, "203.0.113.1")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRateLimit_TwoOrgsSameIP_IndependentBuckets — the load-bearing
|
||||
// regression test for issue #59. Two tenants behind the same upstream
|
||||
// proxy must NOT share a bucket; the production SaaS-plane outage was
|
||||
// every tenant collapsing to the proxy IP and saturating one bucket.
|
||||
//
|
||||
// Mutation invariant: removing the org-id branch from keyFor — say,
|
||||
// returning "ip:" + c.ClientIP() unconditionally — collapses both
|
||||
// tenants back into one bucket and this test fails on the 3rd
|
||||
// request because it would 429 instead of 200.
|
||||
func TestRateLimit_TwoOrgsSameIP_IndependentBuckets(t *testing.T) {
|
||||
r := newTestLimiterForKeyFor(t, 2)
|
||||
|
||||
exhaust := func(orgID string) {
|
||||
t.Helper()
|
||||
for i := 0; i < 2; i++ {
|
||||
req := httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234" // SAME upstream proxy IP
|
||||
req.Header.Set("X-Molecule-Org-Id", orgID)
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("setup orgID=%s req %d: want 200, got %d", orgID, i+1, w.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exhaust("org-aaa")
|
||||
// org-aaa is now at 0 tokens. org-bbb's bucket must be FRESH.
|
||||
req := httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234"
|
||||
req.Header.Set("X-Molecule-Org-Id", "org-bbb")
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("org-bbb on same IP must have its own bucket: got %d, want 200 (issue #59 regression)", w.Code)
|
||||
}
|
||||
|
||||
// Confirm org-aaa is still throttled — proves we're not just opening
|
||||
// the gate to everyone.
|
||||
req = httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234"
|
||||
req.Header.Set("X-Molecule-Org-Id", "org-aaa")
|
||||
w = httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("org-aaa exhausted bucket: want 429, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRateLimit_TwoTokensSameIP_IndependentBuckets — analog of the
|
||||
// org-id case for the per-tenant Caddy box: two distinct user
|
||||
// sessions on the same egress IP, distinguished only by their bearer
|
||||
// tokens, must get independent buckets. This was the path Hongming
|
||||
// hit on hongming.moleculesai.app — a single user with multiple
|
||||
// browser tabs against one workspace-server box.
|
||||
func TestRateLimit_TwoTokensSameIP_IndependentBuckets(t *testing.T) {
|
||||
r := newTestLimiterForKeyFor(t, 2)
|
||||
|
||||
exhaust := func(token string) {
|
||||
t.Helper()
|
||||
for i := 0; i < 2; i++ {
|
||||
req := httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "127.0.0.1:1234" // local Caddy proxy — same for both
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("setup token=%s req %d: want 200, got %d", token, i+1, w.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exhaust("user-a-token")
|
||||
req := httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "127.0.0.1:1234"
|
||||
req.Header.Set("Authorization", "Bearer user-b-token")
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("user-b token on same proxy IP must have its own bucket: got %d, want 200", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRateLimit_SameOrgDifferentTokens_SharedBucket — counter-pin:
|
||||
// ensure org-id keying really does collapse all tokens within one
|
||||
// org into one bucket. This is the desired behaviour: a tenant that
|
||||
// mints multiple tokens shouldn't be able to circumvent its quota
|
||||
// by rotating tokens between requests. (The same-IP-different-org
|
||||
// test above proves we don't collapse ACROSS orgs; this one proves
|
||||
// we DO collapse WITHIN one org.)
|
||||
func TestRateLimit_SameOrgDifferentTokens_SharedBucket(t *testing.T) {
|
||||
r := newTestLimiterForKeyFor(t, 2)
|
||||
|
||||
for _, tok := range []string{"token-1", "token-2"} {
|
||||
req := httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234"
|
||||
req.Header.Set("X-Molecule-Org-Id", "org-shared")
|
||||
req.Header.Set("Authorization", "Bearer "+tok)
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("setup tok=%s: want 200, got %d", tok, w.Code)
|
||||
}
|
||||
}
|
||||
// Bucket should be exhausted now — third request, even with a fresh
|
||||
// token, must 429 because the org-id is keying it.
|
||||
req := httptest.NewRequest(http.MethodGet, "/x", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234"
|
||||
req.Header.Set("X-Molecule-Org-Id", "org-shared")
|
||||
req.Header.Set("Authorization", "Bearer token-3")
|
||||
w := httptest.NewRecorder()
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("rotating tokens within one org should NOT bypass the quota: got %d, want 429", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRateLimit_Middleware_RoutesThroughKeyFor is the AST gate (mirror
|
||||
// of #36/#10/#12's gates). Pins the SSOT routing invariant:
|
||||
// (*RateLimiter).Middleware MUST call rl.keyFor and MUST NOT carry a
|
||||
// direct c.ClientIP() call (= the parallel-impl drift this PR fixes).
|
||||
//
|
||||
// Mutation invariant: a future PR that re-introduces direct IP keying
|
||||
// in Middleware (`ip := c.ClientIP()`) makes this test fail. That's
|
||||
// the signal to either (a) extend keyFor's contract to cover the new
|
||||
// case OR (b) update this gate with an explicit reason. Either way the
|
||||
// drift gets a reviewer's attention before shipping.
|
||||
func TestRateLimit_Middleware_RoutesThroughKeyFor(t *testing.T) {
|
||||
fset := token.NewFileSet()
|
||||
file, err := parser.ParseFile(fset, "ratelimit.go", nil, parser.ParseComments)
|
||||
if err != nil {
|
||||
t.Fatalf("parse ratelimit.go: %v", err)
|
||||
}
|
||||
|
||||
var fn *ast.FuncDecl
|
||||
ast.Inspect(file, func(n ast.Node) bool {
|
||||
f, ok := n.(*ast.FuncDecl)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
// Match `func (rl *RateLimiter) Middleware() ...`
|
||||
if f.Name.Name != "Middleware" {
|
||||
return true
|
||||
}
|
||||
if f.Recv == nil || len(f.Recv.List) != 1 {
|
||||
return true
|
||||
}
|
||||
star, ok := f.Recv.List[0].Type.(*ast.StarExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
if id, ok := star.X.(*ast.Ident); !ok || id.Name != "RateLimiter" {
|
||||
return true
|
||||
}
|
||||
fn = f
|
||||
return false
|
||||
})
|
||||
if fn == nil {
|
||||
t.Fatal("(*RateLimiter).Middleware not found — was it renamed? update this gate or the SSOT routing assumption")
|
||||
}
|
||||
|
||||
var (
|
||||
callsKeyFor bool
|
||||
callsClientIP bool
|
||||
)
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
call, ok := n.(*ast.CallExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
sel, ok := call.Fun.(*ast.SelectorExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
switch sel.Sel.Name {
|
||||
case "keyFor":
|
||||
callsKeyFor = true
|
||||
case "ClientIP":
|
||||
callsClientIP = true
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if !callsKeyFor {
|
||||
t.Error("(*RateLimiter).Middleware must call rl.keyFor for SSOT bucket-key derivation — see issue #59. Found no keyFor call.")
|
||||
}
|
||||
if callsClientIP {
|
||||
t.Error("(*RateLimiter).Middleware carries a direct c.ClientIP() call. This is the parallel-impl drift issue #59 fixed. " +
|
||||
"Either route through rl.keyFor OR — if a new use case truly needs direct IP — extend keyFor's contract first and update this gate to allow the specific delta.")
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user