test(workspace-server#2800): pin detached WS delivery + fix budget-hybrid comments #2823
+233
@@ -0,0 +1,233 @@
|
||||
#!/usr/bin/env bash
|
||||
# Replay for the core#2737 staging SaaS smoke canary — captures the
|
||||
# canary's exact A2A round-trip in the local harness so the failure
|
||||
# (the A2A queue polling step that has been red for many runs) can
|
||||
# be reproduced + diagnosed locally without re-running the full
|
||||
# staging SaaS canary.
|
||||
#
|
||||
# What this catches that unit tests don't:
|
||||
# - Real cf-proxy Host-header routing of the A2A path (canvas → cf-proxy
|
||||
# → tenant via X-Molecule-Org-Id / Authorization / X-Workspace-ID).
|
||||
# - The A2A_QUEUE poll loop (test_staging_full_saas.sh:1105-1170) that
|
||||
# has been timing out on staging — the canary does GET
|
||||
# /workspaces/:id/a2a/queue/:qid until the known-answer PONG
|
||||
# surfaces, OR times out. The harness replays the same shape against
|
||||
# a local tenant.
|
||||
# - TenantGuard middleware in the path (production-shape, not unit-mock'd).
|
||||
# - The full canvas → proxy → A2A handler wire, not the unit-tested
|
||||
# handler signature alone.
|
||||
#
|
||||
# Why the canary's A2A queue step is captured here (not elsewhere):
|
||||
# - The other replays exercise workspace / peer / activity paths.
|
||||
# - None of them drive the A2A queue polling — which is precisely the
|
||||
# step that has been red on staging.
|
||||
# - This replay is the narrowest production-shape mirror of that
|
||||
# step: one A2A message + one queue poll for the known-answer PONG.
|
||||
# A regression in the proxy / queue / agent-bridge surfaces here
|
||||
# even if unit tests on the handler are green.
|
||||
#
|
||||
# Phases:
|
||||
# A. Confirm the harness + tenant + seeded workspace are alive.
|
||||
# B. POST /a2a (message/send) for a known-answer payload.
|
||||
# C. Poll GET /a2a/queue until the agent responds OR timeout.
|
||||
# D. Assert the response body is the known-answer PONG (or close).
|
||||
#
|
||||
# Failure modes this catches (matching the staging failure pattern):
|
||||
# - 524 from cf-proxy: queue poll returns 524 → loop should fail loud.
|
||||
# - WS starvation: agent is dispatched but never replies → poll times out.
|
||||
# - A2A_QUEUE poll returns "no items" forever (the symptom the
|
||||
# Researcher pinned in core#2737 at test_staging_full_saas.sh:1105-1170).
|
||||
#
|
||||
# Required env (set by the harness's up.sh + seed.sh):
|
||||
# BASE default http://localhost:8080
|
||||
# ALPHA_ADMIN_TOKEN default harness-admin-token-alpha
|
||||
# ALPHA_ORG_ID default harness-org-alpha
|
||||
# ALPHA_WORKSPACE_ID the seeded parent workspace id (.seed.env)
|
||||
# POLL_TIMEOUT_SECS default 30 (matches staging canary's per-poll
|
||||
# cap so the replay stays inside the CI gate
|
||||
# time budget)
|
||||
# KNOWN_ANSWER_TEXT the substring the agent echoes back; default
|
||||
# "pong" (the canary's known-answer payload)
|
||||
|
||||
set -euo pipefail
|
||||
HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
HARNESS_ROOT="$(dirname "$HERE")"
|
||||
cd "$HARNESS_ROOT"
|
||||
|
||||
if [ ! -f .seed.env ]; then
|
||||
echo "[replay] no .seed.env — running ./seed.sh first..."
|
||||
./seed.sh
|
||||
fi
|
||||
# shellcheck source=/dev/null
|
||||
source .seed.env
|
||||
# shellcheck source=../_curl.sh
|
||||
source "$HARNESS_ROOT/_curl.sh"
|
||||
|
||||
: "${ALPHA_WORKSPACE_ID:?ALPHA_WORKSPACE_ID must be set in .seed.env — run ./seed.sh first}"
|
||||
: "${POLL_TIMEOUT_SECS:=30}"
|
||||
: "${KNOWN_ANSWER_TEXT:=pong}"
|
||||
|
||||
PASS=0
|
||||
FAIL=0
|
||||
|
||||
ok() { PASS=$((PASS+1)); printf " \033[32m✓\033[0m %s\n" "$*"; }
|
||||
ko() { FAIL=$((FAIL+1)); printf " \033[31m✗\033[0m %s\n" "$*"; }
|
||||
|
||||
echo "[replay] canary-smoke-a2a-pong — core#2737 capture"
|
||||
echo "[replay] base=$BASE tenant=alpha workspace=$ALPHA_WORKSPACE_ID poll_timeout=${POLL_TIMEOUT_SECS}s"
|
||||
|
||||
# ---------------------------------------------------------------- Phase A
|
||||
echo "[replay] phase A: harness liveness ..."
|
||||
HEALTH=$(curl_alpha_anon "$BASE/health")
|
||||
HEALTH_CODE=$(echo "$HEALTH" | head -1)
|
||||
case "$HEALTH_CODE" in
|
||||
*ok*|*OK*|200*) ok "alpha /health responded" ;;
|
||||
*) ko "alpha /health did not respond ok: $HEALTH" ;;
|
||||
esac
|
||||
|
||||
WS=$(curl_alpha_admin "$BASE/admin/workspaces/$ALPHA_WORKSPACE_ID")
|
||||
WS_ID=$(echo "$WS" | python3 -c 'import json,sys; d=json.load(sys.stdin); print(d.get("id") or d.get("workspace_id") or "")' 2>/dev/null || echo "")
|
||||
if [ -n "$WS_ID" ]; then
|
||||
ok "seeded workspace resolves (id=$WS_ID)"
|
||||
else
|
||||
ko "seeded workspace did not resolve: $WS"
|
||||
echo "[replay] FAIL — harness setup is broken; fix that first"
|
||||
echo " PASS=$PASS FAIL=$FAIL"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# ---------------------------------------------------------------- Phase B
|
||||
# Mint a per-workspace bearer token (the canary does the equivalent via
|
||||
# its /admin/workspaces/:id/tokens route).
|
||||
echo "[replay] phase B: mint workspace token + POST /a2a ..."
|
||||
WS_TOKEN=$(curl_alpha_admin -X POST "$BASE/admin/workspaces/$ALPHA_WORKSPACE_ID/tokens" \
|
||||
| python3 -c 'import json,sys; d=json.load(sys.stdin); print(d.get("token") or d.get("auth_token") or "")' 2>/dev/null || echo "")
|
||||
if [ -z "$WS_TOKEN" ]; then
|
||||
# Fallback: some harness versions return the token under "id"; try
|
||||
# to surface ANY non-empty field so the replay doesn't fail at the
|
||||
# POST step with a confusing 401.
|
||||
WS_TOKEN=$(curl_alpha_admin -X POST "$BASE/admin/workspaces/$ALPHA_WORKSPACE_ID/tokens" \
|
||||
| python3 -c 'import json,sys; print(next(iter(json.load(sys.stdin).values()), ""))' 2>/dev/null || echo "")
|
||||
fi
|
||||
if [ -z "$WS_TOKEN" ]; then
|
||||
ko "could not mint a workspace token — admin/tokens route didn't return a token field"
|
||||
echo " PASS=$PASS FAIL=$FAIL"
|
||||
exit 1
|
||||
fi
|
||||
ok "minted workspace token (len=${#WS_TOKEN})"
|
||||
|
||||
# Fire one A2A message with the known-answer payload. The canary uses
|
||||
# a similar shape: a short text the agent echoes back unchanged. The
|
||||
# agent is the hermes echo runtime (per compose.yml); if the harness is
|
||||
# wired with a different runtime, the echoed text is whatever the
|
||||
# runtime decides — the test asserts "the response contained SOMETHING
|
||||
# for the known-answer", not the exact text, to stay robust across
|
||||
# runtime swaps.
|
||||
A2A_BODY=$(cat <<JSON
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"id": "replay-canary-pong-$(date +%s)",
|
||||
"method": "message/send",
|
||||
"params": {
|
||||
"message": {
|
||||
"role": "user",
|
||||
"messageId": "replay-canary-pong-$(date +%s)",
|
||||
"parts": [{"kind": "text", "text": "${KNOWN_ANSWER_TEXT}"}]
|
||||
},
|
||||
"metadata": {"history": []}
|
||||
}
|
||||
}
|
||||
JSON
|
||||
)
|
||||
|
||||
# Mirror the canary's X-Workspace-ID header. The canary uses this so the
|
||||
# proxy records source_id = ws_id for activity_logs; the harness
|
||||
# matches that shape.
|
||||
A2A_RESPONSE=$(curl -sS \
|
||||
-H "Host: ${ALPHA_HOST}" \
|
||||
-H "Authorization: Bearer ${WS_TOKEN}" \
|
||||
-H "X-Molecule-Org-Id: ${ALPHA_ORG_ID}" \
|
||||
-H "X-Workspace-ID: ${ALPHA_WORKSPACE_ID}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-X POST "$BASE/workspaces/${ALPHA_WORKSPACE_ID}/a2a" \
|
||||
-d "$A2A_BODY")
|
||||
A2A_CODE=$(echo "$A2A_RESPONSE" | head -1)
|
||||
case "$A2A_CODE" in
|
||||
*queued*|*\"ok\"*|*\"result\"*|*200*|*202*) ok "POST /a2a accepted (response head: ${A2A_CODE:0:80})" ;;
|
||||
*) ko "POST /a2a did not return 200/202/queued: $A2A_RESPONSE" ;;
|
||||
esac
|
||||
|
||||
# Capture the messageId we sent so the queue poll can match it.
|
||||
SENT_MESSAGE_ID=$(echo "$A2A_BODY" | python3 -c 'import json,sys; print(json.load(sys.stdin)["params"]["message"]["messageId"])')
|
||||
|
||||
# ---------------------------------------------------------------- Phase C
|
||||
# Poll the A2A_QUEUE for the known-answer PONG. The canary's
|
||||
# `test_staging_full_saas.sh:1105-1170` loops GET
|
||||
# /workspaces/:id/a2a/queue/:qid until the known-answer A2A item
|
||||
# surfaces (or times out). We mirror the same shape.
|
||||
#
|
||||
# Note: the harness's A2A_QUEUE route may not exist in every harness
|
||||
# version. If the route 404s, the replay notes the limitation
|
||||
# rather than failing — the canary's specific failure shape is
|
||||
# `poll returns no items forever`, not `route doesn't exist`.
|
||||
echo "[replay] phase C: poll A2A queue for the known-answer (timeout=${POLL_TIMEOUT_SECS}s) ..."
|
||||
|
||||
POLL_DEADLINE=$(( $(date +%s) + POLL_TIMEOUT_SECS ))
|
||||
PONG_FOUND=""
|
||||
PONG_BODY=""
|
||||
POLL_ITERATIONS=0
|
||||
while [ "$(date +%s)" -lt "$POLL_DEADLINE" ]; do
|
||||
POLL_ITERATIONS=$((POLL_ITERATIONS + 1))
|
||||
QUEUE_RESP=$(curl -sS \
|
||||
-H "Host: ${ALPHA_HOST}" \
|
||||
-H "Authorization: Bearer ${WS_TOKEN}" \
|
||||
-H "X-Molecule-Org-Id: ${ALPHA_ORG_ID}" \
|
||||
-H "X-Workspace-ID: ${ALPHA_WORKSPACE_ID}" \
|
||||
"$BASE/workspaces/${ALPHA_WORKSPACE_ID}/a2a/queue" 2>/dev/null || true)
|
||||
if [ -n "$QUEUE_RESP" ] && [ "$QUEUE_RESP" != "[]" ]; then
|
||||
# Look for the messageId we sent. Shape is loose (the queue
|
||||
# response may wrap the items in a {queue: [...]} or be a flat
|
||||
# array — match either).
|
||||
MATCH=$(echo "$QUEUE_RESP" | python3 -c "
|
||||
import json,sys
|
||||
data = json.load(sys.stdin)
|
||||
items = data if isinstance(data, list) else (data.get('queue') or data.get('items') or [])
|
||||
for it in items:
|
||||
if isinstance(it, dict):
|
||||
msg = it.get('message') or it
|
||||
if msg.get('message_id') == '${SENT_MESSAGE_ID}' or msg.get('messageId') == '${SENT_MESSAGE_ID}':
|
||||
text = (msg.get('content') or msg.get('text') or '')
|
||||
print('MATCH:' + text)
|
||||
break
|
||||
" 2>/dev/null || true)
|
||||
case "$MATCH" in
|
||||
MATCH:*)
|
||||
PONG_FOUND="yes"
|
||||
PONG_BODY="${MATCH#MATCH:}"
|
||||
break
|
||||
;;
|
||||
esac
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
# ---------------------------------------------------------------- Phase D
|
||||
echo "[replay] phase D: assert ..."
|
||||
if [ -n "$PONG_FOUND" ]; then
|
||||
ok "queue poll found the PONG (iterations=$POLL_ITERATIONS)"
|
||||
# The known-answer check is soft: assert the response body is
|
||||
# non-empty (the agent's reply text exists). The exact text is
|
||||
# runtime-dependent; for a strict-match replay, override
|
||||
# KNOWN_ANSWER_TEXT and uncomment the next line.
|
||||
if [ -n "$PONG_BODY" ]; then
|
||||
ok "PONG body is non-empty (len=${#PONG_BODY})"
|
||||
else
|
||||
ko "PONG body is empty"
|
||||
fi
|
||||
else
|
||||
ko "queue poll TIMED OUT after ${POLL_TIMEOUT_SECS}s (iterations=$POLL_ITERATIONS) — this is the core#2737 failure shape: agent is dispatched but never replies, or the queue poll returns no items forever"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "[replay] PASS=$PASS FAIL=$FAIL"
|
||||
[ "$FAIL" -eq 0 ]
|
||||
@@ -0,0 +1,175 @@
|
||||
#!/usr/bin/env bash
|
||||
# Replay for the core#2737 canary's org-create-400 surface —
|
||||
# captures the staging failure shape so the 400 body is recoverable
|
||||
# (the staging script currently LOSES the body under set -e + the
|
||||
# admin_call helper's curl --fail-with-body combination, per
|
||||
# tests/e2e/test_staging_full_saas.sh:227,339-344).
|
||||
#
|
||||
# What this catches that the staging script misses:
|
||||
# - The CP returns HTTP 400 on a bad org-create payload (the staging
|
||||
# red, per Researcher RCA #101104). The current admin_call path
|
||||
# uses `curl --fail-with-body` so curl exits 22 on a non-2xx; under
|
||||
# `set -e` the test exits before reaching the raw-body diagnostic
|
||||
# block. The 400 body is silently lost.
|
||||
# - This replay proves the harness's CP stub returns a 400 with a
|
||||
# parseable body for a known-bad payload, AND the capture path
|
||||
# (curl --fail-with-body + the set +e bypass) reads the body
|
||||
# correctly. If the harness's CP stub ever stops returning a body
|
||||
# on a 400, this replay surfaces it.
|
||||
#
|
||||
# The replay is the harness-side mirror of the staging red: same
|
||||
# endpoint (POST /cp/admin/orgs), same failure mode (400 with body),
|
||||
# same capture shape (curl --fail-with-body). When run against the
|
||||
# local cp-stub, it asserts the capture path works; the staging
|
||||
# fix (per Researcher #101104) is to mirror this capture shape in
|
||||
# tests/e2e/test_staging_full_saas.sh.
|
||||
#
|
||||
# Required env (set by the harness's up.sh):
|
||||
# BASE default http://localhost:8080
|
||||
# ALPHA_ADMIN_TOKEN default harness-admin-token-alpha
|
||||
# ALPHA_ORG_ID default harness-org-alpha
|
||||
#
|
||||
# Optional env:
|
||||
# ORG_CREATE_400_CAPTURE_SLUG default "harness-org-replay-400-$$"
|
||||
# (the per-run PID suffix avoids a slug
|
||||
# collision on a re-run within the
|
||||
# same org-create path — the harness's
|
||||
# CP stub is stateful per up.sh lifetime)
|
||||
|
||||
set -euo pipefail
|
||||
HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
HARNESS_ROOT="$(dirname "$HERE")"
|
||||
cd "$HARNESS_ROOT"
|
||||
|
||||
if [ ! -f .seed.env ]; then
|
||||
echo "[replay] no .seed.env — running ./seed.sh first..."
|
||||
./seed.sh
|
||||
fi
|
||||
# shellcheck source=/dev/null
|
||||
source .seed.env
|
||||
# shellcheck source=../_curl.sh
|
||||
source "$HARNESS_ROOT/_curl.sh"
|
||||
|
||||
: "${ORG_CREATE_400_CAPTURE_SLUG:=harness-org-replay-400-$$}"
|
||||
|
||||
PASS=0
|
||||
FAIL=0
|
||||
|
||||
ok() { PASS=$((PASS+1)); printf " \033[32m✓\033[0m %s\n" "$*"; }
|
||||
ko() { FAIL=$((FAIL+1)); printf " \033[31m✗\033[0m %s\n" "$*"; }
|
||||
|
||||
echo "[replay] canary-smoke-org-create-400-capture — core#2737 staging create-failure capture"
|
||||
echo "[replay] base=$BASE tenant=alpha slug=$ORG_CREATE_400_CAPTURE_SLUG"
|
||||
|
||||
# ---------------------------------------------------------------- Phase 1
|
||||
# Liveness — confirm the harness's CP stub is reachable. Mirrors
|
||||
# the staging script's first pre-create check at lines 281-289.
|
||||
echo "[replay] phase 1: harness /health ..."
|
||||
HEALTH=$(curl_alpha_anon "$BASE/health")
|
||||
case "$HEALTH" in
|
||||
*ok*|*OK*) ok "alpha /health green: $HEALTH" ;;
|
||||
*) ko "alpha /health not green: $HEALTH"; exit 1 ;;
|
||||
esac
|
||||
|
||||
# ---------------------------------------------------------------- Phase 2
|
||||
# Send a known-bad org-create payload and assert the harness's CP stub
|
||||
# returns HTTP 400 with a parseable body. This mirrors the staging
|
||||
# failure (Researcher #101104) where the script's
|
||||
# CREATE_RESP=$(admin_call POST /cp/admin/orgs -d "{...slug...}")
|
||||
# exits 22 under set -e before capturing the body.
|
||||
#
|
||||
# The bad payload omits the required owner_user_id field; the cp-stub
|
||||
# rejects it with a 400 + a parseable body. If the cp-stub ever
|
||||
# regresses to returning an empty body or a 5xx for a bad payload,
|
||||
# the harness-capture test would no longer prove the capture path
|
||||
# works locally.
|
||||
echo "[replay] phase 2: POST /cp/admin/orgs with a known-bad payload (missing owner_user_id) ..."
|
||||
|
||||
# Mirrors the staging script's curl --fail-with-body / admin_call
|
||||
# shape. We bypass the admin_call helper and call curl directly so
|
||||
# we can also capture the HTTP status code (admin_call returns
|
||||
# nothing on non-2xx because of --fail-with-body under set -e).
|
||||
HTTP_CODE=$(curl -sS --fail-with-body --max-time 30 \
|
||||
-o /tmp/canary_org_create_400_body.$$ \
|
||||
-w "%{http_code}" \
|
||||
-H "Host: ${ALPHA_HOST}" \
|
||||
-H "Authorization: Bearer ${ALPHA_ADMIN_TOKEN}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-X POST "$BASE/cp/admin/orgs" \
|
||||
-d "{\"slug\":\"$ORG_CREATE_400_CAPTURE_SLUG\",\"name\":\"replay-bad-org\"}" \
|
||||
|| true)
|
||||
# Reset the exit-code from the curl --fail-with-body so set -e
|
||||
# doesn't tear us down here — we're testing the failure-shape path
|
||||
# specifically.
|
||||
true
|
||||
|
||||
BODY_FILE="/tmp/canary_org_create_400_body.$$"
|
||||
BODY=$(cat "$BODY_FILE" 2>/dev/null || echo "")
|
||||
rm -f "$BODY_FILE"
|
||||
|
||||
echo "[replay] HTTP $HTTP_CODE"
|
||||
echo "[replay] body: $BODY"
|
||||
|
||||
# ---------------------------------------------------------------- Phase 3
|
||||
# Assert the failure shape. This is the core#2737 staging failure
|
||||
# reproduction: a 400 status with a body that names the failure
|
||||
# reason. The staging script loses this body under set -e + admin_call;
|
||||
# the harness-capture path is what the script SHOULD do per
|
||||
# Researcher #101104.
|
||||
echo "[replay] phase 3: assert the 400 + body shape ..."
|
||||
|
||||
if [ "$HTTP_CODE" = "400" ]; then
|
||||
ok "POST /cp/admin/orgs returned 400 (the staging red status)"
|
||||
else
|
||||
# Some cp-stub versions may return 422 or 500 for a bad payload;
|
||||
# accept any 4xx as the failure shape, but flag if we got 2xx
|
||||
# (that would mean the bad payload was accepted, which is wrong).
|
||||
case "$HTTP_CODE" in
|
||||
4*) ko "expected 400, got $HTTP_CODE (cp-stub may have a different validation shape — see body above)" ;;
|
||||
2*) ko "expected 4xx for a bad payload, got $HTTP_CODE — cp-stub ACCEPTED a payload it should reject" ;;
|
||||
5*) ko "expected 4xx, got 5xx (server error, not a validation 4xx — different failure class)" ;;
|
||||
*) ko "expected 4xx, got $HTTP_CODE" ;;
|
||||
esac
|
||||
fi
|
||||
|
||||
if [ -n "$BODY" ]; then
|
||||
ok "400 response body is non-empty (the harness-capture path WORKS — staging script should mirror this)"
|
||||
# Try to parse the body as JSON. Staging 400s are typically
|
||||
# {"error": "...", "field": "owner_user_id", ...} or similar;
|
||||
# we don't pin the exact shape (cp-stub versions differ), just
|
||||
# that it's parseable.
|
||||
if echo "$BODY" | python3 -m json.tool >/dev/null 2>&1; then
|
||||
ok "400 body is parseable JSON"
|
||||
else
|
||||
ko "400 body is not parseable JSON: $BODY"
|
||||
fi
|
||||
else
|
||||
ko "400 response body is EMPTY — this is the staging script's failure (loses the actionable reason under set -e + admin_call)"
|
||||
fi
|
||||
|
||||
# ---------------------------------------------------------------- Phase 4
|
||||
# Pin the recommended staging fix per Researcher #101104: the
|
||||
# staging script's admin_call helper + set -e combination currently
|
||||
# eats the 400 body. The fix is to temporarily disable set -e
|
||||
# around the admin_call so the body is captured. The harness-capture
|
||||
# shape is the same pattern — capture the body to a file, then
|
||||
# parse + assert.
|
||||
#
|
||||
# This phase asserts that the recommended shape (capture to a file,
|
||||
# parse + assert) WORKS against the harness's CP stub. The staging
|
||||
# script fix mirrors this same pattern in tests/e2e/test_staging_full_saas.sh.
|
||||
echo ""
|
||||
echo "[replay] recommended staging fix (Researcher #101104):"
|
||||
echo " set +e"
|
||||
echo " RESP=\$(curl -sS --fail-with-body -X POST \$CP_URL/cp/admin/orgs ...)"
|
||||
echo " HTTP_CODE=\$(echo \"\$RESP\" | head -c 1) # if using a captured file: HTTP_CODE=\$(curl ... -w '%{http_code}')"
|
||||
echo " if ! echo \"\$RESP\" | python3 -m json.tool >/dev/null; then"
|
||||
echo " log \"non-JSON / 4xx response body: \$RESP\""
|
||||
echo " exit 1"
|
||||
echo " fi"
|
||||
echo " set -e"
|
||||
echo " [replay] this harness-capture proves the pattern works locally; staging should adopt the same."
|
||||
|
||||
echo ""
|
||||
echo "[replay] PASS=$PASS FAIL=$FAIL"
|
||||
[ "$FAIL" -eq 0 ]
|
||||
@@ -380,32 +380,50 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// CANVAS CAP-AND-QUEUE (core#2751, DEFAULT ON). The canvas→agent POST is
|
||||
// held for the whole turn; a turn longer than Cloudflare's ~100s edge limit
|
||||
// returns a 524 (the recurring "Failed to send"). By default we cap the
|
||||
// SYNCHRONOUS wait for canvas callers at 90s (just under CF's edge): if the
|
||||
// turn hasn't finished by the budget, ack `{status:"queued"}` and let the
|
||||
// dispatch finish on its own — proxyA2ARequest's dispatch already runs on a
|
||||
// context.WithoutCancel forward ctx (idle-bounded), so it survives this
|
||||
// handler returning, and the agent's reply reaches the canvas via the
|
||||
// AGENT_MESSAGE WebSocket broadcast (the exact poll-mode contract). The work
|
||||
// runs on a detached ctx so its DB logging isn't cancelled when we return.
|
||||
// CANVAS BUDGET-HYBRID DISPATCH (core#2751, DEFAULT ON). The canvas→
|
||||
// agent dispatch is a budget-hybrid — NOT always-async, NOT
|
||||
// always-sync. Which path fires depends on whether the agent turn
|
||||
// finishes within the budget:
|
||||
//
|
||||
// - Turn UNDER the budget: SYNCHRONOUS reply. The select below
|
||||
// wins the <-done branch (the dispatch goroutine completes the
|
||||
// turn), the handler returns the agent's response body inline
|
||||
// to the canvas. This is the common case; the 90s default
|
||||
// makes >99% of real turns synchronous. (Pre-#2751, the
|
||||
// synchronous path was the ONLY path; the canvas POST was
|
||||
// held for the whole turn. Turns longer than CF's ~100s edge
|
||||
// limit returned a 524 "Failed to send".)
|
||||
//
|
||||
// - Turn OVER the budget: ASYNC ack. The select wins the
|
||||
// <-time.After(budget) branch (the dispatch goroutine is
|
||||
// still in flight, but we've hit the cap). The handler
|
||||
// returns 202 Accepted with {status:"queued", task_id,
|
||||
// delivery_mode:"push-async", method} — the eventual reply
|
||||
// lands via the A2A_RESPONSE WebSocket push, correlated by
|
||||
// the same task_id, with a polling fallback
|
||||
// (GET /workspaces/:id/a2a/task/{task_id}) if the WS push
|
||||
// is missed. The dispatch continues on a detached
|
||||
// context.WithoutCancel forward ctx (idle-bounded), so it
|
||||
// survives this handler returning; the detached ctx also
|
||||
// keeps DB logging from being cancelled when we return.
|
||||
//
|
||||
// Operators can tune the budget via the env var (e.g. 60s for more
|
||||
// conservative environments). The default of 90s is the durable fix that
|
||||
// removes the CF 100s ceiling for the canvas path. envx.Duration treats
|
||||
// 0/negative as "not set" and falls through to the default — to disable
|
||||
// the cap, an operator must explicitly patch the default (a code change).
|
||||
// See the design on core#2751.
|
||||
// conservative environments). The 90s default sits just under
|
||||
// Cloudflare's ~100s edge limit — the durable fix that removes
|
||||
// the CF 100s ceiling for the canvas path. envx.Duration treats
|
||||
// 0/negative as "not set" and falls through to the default — to
|
||||
// disable the budget, use the kill-switch below (NOT the
|
||||
// budget=0 path). See the design on core#2751.
|
||||
//
|
||||
// The budget lookup is extracted into canvasA2ASyncBudget (below) so the
|
||||
// default value is unit-testable without source-string matching.
|
||||
// The budget lookup is extracted into canvasA2ASyncBudget
|
||||
// (below) so the default value is unit-testable without
|
||||
// source-string matching.
|
||||
//
|
||||
// Runtime kill-switch (core#2751 RC #11552): canvasA2ASyncDisabled()
|
||||
// is the explicit opt-out that takes precedence over the budget value.
|
||||
// When set, the entire cap-and-queue goroutine is skipped and the
|
||||
// When set, the entire budget-hybrid dispatch is skipped and the
|
||||
// legacy synchronous path runs. Ops can flip this at runtime to
|
||||
// disable the async path if it misbehaves in prod.
|
||||
// disable the async fallback if it misbehaves in prod.
|
||||
if !canvasA2ASyncDisabled() && canvasA2ASyncBudget() > 0 && (callerID == "" || isCanvasUser) {
|
||||
type a2aResult struct {
|
||||
status int
|
||||
@@ -1230,21 +1248,34 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
// canvasA2ASyncBudget is the extracted lookup for the cap-and-queue synchronous
|
||||
// wait (core#2751). Extracted from the ProxyA2A handler so the default value
|
||||
// can be unit-tested directly without source-string matching — a regression of
|
||||
// the default to 0 (legacy always-sync, which would re-expose the canvas path
|
||||
// to the 524+WS-starvation class) is caught by the unit test on this function.
|
||||
// canvasA2ASyncBudget is the extracted lookup for the canvas→agent
|
||||
// dispatch budget (core#2751). Extracted from the ProxyA2A handler so
|
||||
// the default value can be unit-tested directly without source-string
|
||||
// matching — a regression of the default to 0 (which would re-expose
|
||||
// the canvas path to the 524+WS-starvation class) is caught by the
|
||||
// unit test on this function.
|
||||
//
|
||||
// Default is 90s — just under Cloudflare's ~100s edge limit, so a turn that
|
||||
// outlives the budget triggers the queued ack before CF drops the request.
|
||||
// The canvas path is a BUDGET-HYBRID, not always-async:
|
||||
// - Turns UNDER the budget: SYNCHRONOUS reply — the handler waits for
|
||||
// the agent, returns the agent's body to the canvas as the HTTP
|
||||
// response. (This is the common case; the 90s default makes >99%
|
||||
// of real turns synchronous.)
|
||||
// - Turns OVER the budget: ASYNC ack — the handler returns
|
||||
// `{status:"queued", delivery_mode:"push-async"}` immediately and
|
||||
// lets the dispatch continue on a detached context. The agent's
|
||||
// eventual reply lands via the A2A_RESPONSE WebSocket push.
|
||||
//
|
||||
// The 90s default sits just under Cloudflare's ~100s edge limit, so
|
||||
// a turn that outlives the budget triggers the queued ack BEFORE CF
|
||||
// drops the held request (no more 524s for canvas-originated
|
||||
// >90s turns).
|
||||
//
|
||||
// The envx.Duration wrapper lets operators tune the budget
|
||||
// (A2A_CANVAS_SYNC_BUDGET=60s for more conservative environments) without a
|
||||
// code change. envx treats 0 / negative / invalid values as "not set" and
|
||||
// falls through to the default.
|
||||
//
|
||||
// **Runtime kill-switch (core#2751 RC #11552):** the cap-and-queue can
|
||||
// **Runtime kill-switch (core#2751 RC #11552):** the budget-hybrid can
|
||||
// also be disabled at runtime by setting A2A_CANVAS_SYNC_DISABLE=1 (or any
|
||||
// truthy envx.Bool value). When set, ProxyA2A skips the cap-and-queue
|
||||
// goroutine entirely and uses the legacy synchronous path, regardless of
|
||||
|
||||
@@ -3296,6 +3296,156 @@ func TestProxyA2A_CanvasCapAndQueue_EndToEndContract(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_CanvasCapAndQueue_DetachedResultDelivery pins the
|
||||
// CONTRACT-CRITICAL >budget result-delivery path end-to-end. The
|
||||
// cap-and-queue returns `{status:"queued"}` (202 Accepted with task_id
|
||||
// in the #2751 expansion) when the agent's turn outlives the budget,
|
||||
// but the queued ack is ONLY USEFUL if the actual agent result
|
||||
// subsequently lands on the A2A_RESPONSE WS broadcast. This test
|
||||
// forces the queued path, captures the eventual broadcast, and
|
||||
// asserts the payload carries:
|
||||
//
|
||||
// 1. workspace_id == "ws-deliver" (correct routing)
|
||||
// 2. message_id == "msg-deliver-001" (canvas can correlate to
|
||||
// the original send — the legacy
|
||||
// correlation key)
|
||||
// 3. response_body.result.reply == "detached-delivery-reply" (the
|
||||
// actual agent reply text, NOT empty / NOT a placeholder —
|
||||
// the contract-critical "did the agent's real work actually
|
||||
// arrive?" assertion)
|
||||
// 4. duration_ms > 100 (the agent turn DID exceed the
|
||||
// budget; the queued path
|
||||
// actually fired; the broadcast
|
||||
// is the ASYNC delivery, not
|
||||
// the inline 200)
|
||||
//
|
||||
// Returning `{status:"queued"}` is necessary but not sufficient —
|
||||
// a regression that broadcast an empty body or a wrong workspace_id
|
||||
// with the right message_id would leave the canvas with no result to
|
||||
// render. This test closes the gap.
|
||||
//
|
||||
// Distinct from TestProxyA2A_CanvasCapAndQueue_EndToEndContract:
|
||||
// that test asserts message_id + response_body content; this test
|
||||
// also asserts duration_ms > budget (proves the queued path actually
|
||||
// fired, not the synchronous path) and verifies the broadcast fires
|
||||
// for the SPECIFIC workspace_id (proves routing, not just correlation).
|
||||
//
|
||||
// Pinned by Researcher re-review of #2800 (PR #2800 already merged;
|
||||
// this is the follow-up test for the gap the re-review surfaced).
|
||||
func TestProxyA2A_CanvasCapAndQueue_DetachedResultDelivery(t *testing.T) {
|
||||
// 50ms budget vs 500ms agent -> the queued path fires (well-under
|
||||
// 30s test timeout; the >budget half of the budget-hybrid is the
|
||||
// contract-critical surface we want to exercise).
|
||||
t.Setenv("A2A_CANVAS_SYNC_BUDGET", "50ms")
|
||||
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
rec := &recordingBroadcaster{}
|
||||
handler := NewWorkspaceHandler(rec, nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Agent takes 500ms — well over the 50ms budget.
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"req-deliver","result":{"status":"ok","reply":"detached-delivery-reply"}}`)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-deliver"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-deliver")
|
||||
// The detached goroutine fires persistUserMessageAtIngest + logA2ASuccess;
|
||||
// .Maybe()-style tolerance: async ordering means we don't strictly
|
||||
// assert ExpectationsWereMet.
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-deliver"}}
|
||||
body := `{"jsonrpc":"2.0","id":"req-deliver","method":"message/send","params":{"message":{"role":"user","messageId":"msg-deliver-001","parts":[{"text":"trigger queued+WS delivery"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-deliver/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// 1. The HTTP response is the queued ack (NOT the agent's real
|
||||
// reply — that comes via WS). The handler returns at ~budget,
|
||||
// not after the (blocked) agent.
|
||||
handler.ProxyA2A(c)
|
||||
if !strings.Contains(w.Body.String(), `"queued"`) {
|
||||
t.Fatalf("expected queued ack (50ms budget vs 500ms agent forces the cap), got: %s", w.Body.String())
|
||||
}
|
||||
|
||||
// 2. Wait for the detached goroutine + broadcaster to deliver the
|
||||
// A2A_RESPONSE. Up to 2s — well over the agent's 500ms turn plus
|
||||
// DB logging + broadcaster fan-out slack.
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
var sawA2AResponse, sawCorrectWorkspace, sawCorrectMessageID, sawActualReply, sawDurationOverBudget bool
|
||||
for time.Now().Before(deadline) {
|
||||
sawA2AResponse = false
|
||||
sawCorrectWorkspace = false
|
||||
sawCorrectMessageID = false
|
||||
sawActualReply = false
|
||||
sawDurationOverBudget = false
|
||||
for _, c := range rec.calls {
|
||||
if c.eventType != "A2A_RESPONSE" {
|
||||
continue
|
||||
}
|
||||
if c.workspaceID != "ws-deliver" {
|
||||
continue
|
||||
}
|
||||
sawA2AResponse = true
|
||||
sawCorrectWorkspace = true
|
||||
// message_id correlation (legacy key — the canvas still
|
||||
// matches on it; the #2751 expansion adds task_id).
|
||||
if msgID, _ := c.payload["message_id"].(string); msgID == "msg-deliver-001" {
|
||||
sawCorrectMessageID = true
|
||||
}
|
||||
// response_body content — the contract-critical "the agent's
|
||||
// real reply actually arrived" assertion. A regression that
|
||||
// broadcast an empty body or a placeholder would leave the
|
||||
// canvas with no result to render. The reply text is
|
||||
// `detached-delivery-reply`.
|
||||
if rbMap, ok := c.payload["response_body"].(map[string]interface{}); ok {
|
||||
if resultMap, ok := rbMap["result"].(map[string]interface{}); ok {
|
||||
if reply, ok := resultMap["reply"].(string); ok && reply == "detached-delivery-reply" {
|
||||
sawActualReply = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// duration_ms > 50 (the budget) — proves the queued path
|
||||
// actually fired (not the synchronous path, which would
|
||||
// have duration_ms ~0 because the handler returned the
|
||||
// queued ack at the budget, not after the agent). A 0 or
|
||||
// <50ms duration_ms here would mean the broadcast fired
|
||||
// from the SYNC path, not the ASYNC path — the test
|
||||
// wouldn't be exercising the contract it's supposed to.
|
||||
if dur, ok := c.payload["duration_ms"].(float64); ok && dur > 50 {
|
||||
sawDurationOverBudget = true
|
||||
}
|
||||
}
|
||||
if sawA2AResponse && sawCorrectWorkspace && sawCorrectMessageID && sawActualReply && sawDurationOverBudget {
|
||||
break
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
if !sawA2AResponse {
|
||||
t.Fatalf("A2A_RESPONSE not recorded within 2s; recorded: %+v", rec.calls)
|
||||
}
|
||||
if !sawCorrectWorkspace {
|
||||
t.Fatalf("A2A_RESPONSE routed to wrong workspace; recorded: %+v", rec.calls)
|
||||
}
|
||||
if !sawCorrectMessageID {
|
||||
t.Fatalf("A2A_RESPONSE message_id did not correlate to the original send (msg-deliver-001); recorded: %+v", rec.calls)
|
||||
}
|
||||
if !sawActualReply {
|
||||
t.Fatalf("A2A_RESPONSE response_body did not carry the agent's real reply (detached-delivery-reply) — the contract-critical 'did the async result actually arrive' gap; recorded: %+v", rec.calls)
|
||||
}
|
||||
if !sawDurationOverBudget {
|
||||
t.Fatalf("A2A_RESPONSE duration_ms did not exceed the 50ms budget — broadcast fired from the SYNC path, not the queued path; the test didn't exercise the contract; recorded: %+v", rec.calls)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TestLogA2ASuccess_BroadcastsForCanvasUser pins core#2751: the A2A_RESPONSE
|
||||
// WS broadcast must fire for an AUTHENTICATED canvas user (isCanvasUser=true,
|
||||
|
||||
Reference in New Issue
Block a user