fix(e2e): poll GetA2AQueueStatus on 202-queued A2A responses (core#2437 part B) #2708

Merged
devops-engineer merged 1 commits from fix/2437-a2a-ready-boundary-poll into main 2026-06-13 05:39:32 +00:00
+151 -160
View File
@@ -1088,6 +1088,152 @@ done
# externally routable readiness boundary again.
wait_workspaces_online_routable "7d/11 Waiting for workspace(s) to recover routing after config.yaml PUT..." "${WS_TO_CHECK[@]}"
# ─── A2A send with 202-queued poll helper (core#2437 part B) ───────────
# Sends POST /workspaces/:id/a2a. If the agent is busy/starting and returns
# a 2xx with queued:true + queue_id, poll GetA2AQueueStatus until the durable
# result is available. Handles curl rc 28 / http 000 / 404 retryable while the
# queue row is still materializing, and transient 502/503/504 cold-start.
# Prints the final A2A JSON-RPC response body to stdout; logs to stderr.
a2a_send_or_poll_queue() {
local ws_id="$1"; shift
local payload="$1"; shift
local label="$1"
local tmp qid resp code rc attempt poll_attempt poll_tmp
tmp=$(mktemp -t a2a_poll.XXXXXX)
qid=""
for attempt in $(seq 1 12); do
if [ -n "$qid" ]; then
# We have a queue_id — poll GetA2AQueueStatus for the durable result.
poll_tmp=$(mktemp -t a2a_qpoll.XXXXXX)
for poll_attempt in $(seq 1 30); do
: >"$poll_tmp"
set +e
code=$(tenant_call GET "/workspaces/$ws_id/a2a/queue/$qid" \
--max-time 30 \
-H "X-Workspace-ID: $ws_id" \
-o "$poll_tmp" \
-w '%{http_code}' \
2>/dev/null)
rc=$?
set -e
code=${code:-000}
resp=$(cat "$poll_tmp" 2>/dev/null || echo "")
if [ "$rc" != "0" ] || [ "$code" = "000" ] || [ "$code" = "404" ]; then
echo " $label queue poll attempt $poll_attempt/30: curl_rc=$rc http=$code — retryable, backing off 2s" >&2
sleep 2
continue
fi
if [ "$code" -lt 200 ] || [ "$code" -ge 300 ]; then
rm -f "$poll_tmp" "$tmp"
fail "$label queue poll failed (http=$code): $(printf '%s' "$resp" | sanitize_http_body)"
fi
local qstatus
qstatus=$(printf '%s' "$resp" | python3 -c "
import json,sys
try:
print(json.load(sys.stdin).get('status',''))
except Exception:
print('')" 2>/dev/null || echo "")
case "$qstatus" in
completed)
resp=$(printf '%s' "$resp" | python3 -c "
import json,sys
try:
rb=json.load(sys.stdin).get('response_body')
print(json.dumps(rb) if rb is not None else '')
except Exception:
print('')" 2>/dev/null || echo "")
if [ -n "$resp" ]; then
code=200
break 2
fi
;;
failed|dropped)
rm -f "$poll_tmp" "$tmp"
fail "$label queue item $qid terminal status=$qstatus: $(printf '%s' "$resp" | sanitize_http_body)"
;;
queued|dispatched|in_progress|"")
echo " $label queue poll attempt $poll_attempt/30 status=$qstatus — backing off 2s" >&2
sleep 2
;;
*)
rm -f "$poll_tmp" "$tmp"
fail "$label queue poll unexpected status=$qstatus: $(printf '%s' "$resp" | sanitize_http_body)"
;;
esac
done
rm -f "$poll_tmp"
# Ran out of queue poll attempts.
fail "$label queue poll timed out waiting for $qid to complete"
fi
# Initial POST (or retry before we have a queue_id).
: >"$tmp"
set +e
code=$(tenant_call POST "/workspaces/$ws_id/a2a" \
--max-time 90 \
-H "Content-Type: application/json" \
-H "X-Workspace-ID: $ws_id" \
-d "$payload" \
-o "$tmp" \
-w '%{http_code}' \
2>/dev/null)
rc=$?
set -e
code=${code:-000}
resp=$(cat "$tmp" 2>/dev/null || echo "")
if [ "$rc" = "0" ] && [ "$code" -ge 200 ] && [ "$code" -lt 300 ]; then
local is_queued
is_queued=$(printf '%s' "$resp" | python3 -c "
import json,sys
try:
d=json.load(sys.stdin)
print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false')
except Exception:
print('false')" 2>/dev/null || echo "false")
if [ "$is_queued" = "true" ]; then
qid=$(printf '%s' "$resp" | python3 -c "
import json,sys
try:
print(json.load(sys.stdin).get('queue_id',''))
except Exception:
print('')" 2>/dev/null || echo "")
if [ -n "$qid" ]; then
echo " $label A2A queued (queue_id=$qid); switching to poll" >&2
continue
fi
else
break
fi
fi
local safe_body
safe_body=$(printf '%s' "$resp" | sanitize_http_body)
if echo "$code" | grep -Eq '^(502|503|504)$' && echo "$safe_body" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then
echo " $label A2A transient $code attempt $attempt/12: $safe_body" >&2
if [ "$attempt" -lt 12 ]; then
local sleep_sec=10
if echo "$safe_body" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then
sleep_sec=30
fi
sleep "$sleep_sec"
continue
fi
fi
break
done
rm -f "$tmp"
if [ "$rc" != "0" ] || [ "$code" -lt 200 ] || [ "$code" -ge 300 ]; then
fail "$label failed after $attempt attempt(s) (curl_rc=$rc, http=$code): $(printf '%s' "$resp" | sanitize_http_body)"
fi
printf '%s' "$resp"
}
# ─── 8. A2A round-trip on parent ───────────────────────────────────────
log "8/11 Sending A2A message to parent — expecting agent response..."
# Smoke prompt phrasing — DO NOT trim back to the bare "Reply with exactly: PONG"
@@ -1127,44 +1273,9 @@ print(json.dumps({
# 90s gives ~3x headroom over observed cold-call P95 (~25-30s).
# Subsequent A2A turns hit the same workspace and are sub-second, so
# this only widens the window for step 8/11 of the canary's first turn.
A2A_TMP=$(mktemp -t synth_a2a.XXXXXX)
for A2A_ATTEMPT in $(seq 1 12); do
: >"$A2A_TMP"
set +e
A2A_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \
--max-time 90 \
-H "Content-Type: application/json" \
-d "$A2A_PAYLOAD" \
-o "$A2A_TMP" \
-w '%{http_code}' \
2>/dev/null)
A2A_RC=$?
set -e
A2A_CODE=${A2A_CODE:-000}
A2A_RESP=$(cat "$A2A_TMP" 2>/dev/null || echo "")
if [ "$A2A_RC" = "0" ] && [ "$A2A_CODE" -ge 200 ] && [ "$A2A_CODE" -lt 300 ]; then
break
fi
A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body)
if echo "$A2A_CODE" | grep -Eq '^(502|503|504)$' && echo "$A2A_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then
log " A2A cold-start probe attempt $A2A_ATTEMPT/12 returned $A2A_CODE: $A2A_SAFE_BODY"
if [ "$A2A_ATTEMPT" -lt 12 ]; then
A2A_SLEEP=10
if echo "$A2A_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session'; then
A2A_SLEEP=30
fi
sleep "$A2A_SLEEP"
continue
fi
fi
break
done
rm -f "$A2A_TMP"
if [ "$A2A_RC" != "0" ] || [ "$A2A_CODE" -lt 200 ] || [ "$A2A_CODE" -ge 300 ]; then
A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body)
fail "A2A POST /workspaces/$PARENT_ID/a2a failed after $A2A_ATTEMPT attempt(s) (curl_rc=$A2A_RC, http=$A2A_CODE): $A2A_SAFE_BODY"
fi
# core#2437 part B: send A2A and, if the agent is busy/starting, poll the
# queue status endpoint until the durable result is available.
A2A_RESP=$(a2a_send_or_poll_queue "$PARENT_ID" "$A2A_PAYLOAD" "A2A parent")
AGENT_TEXT=$(echo "$A2A_RESP" | python3 -c "
import json, sys
d = json.load(sys.stdin)
@@ -1280,128 +1391,8 @@ print(json.dumps({
}
}))
")
KA_TMP=$(mktemp -t known_answer_a2a.XXXXXX)
KA_RESP=""
KA_QUEUE_ID=""
for KA_ATTEMPT in $(seq 1 12); do
: >"$KA_TMP"
if [ -n "$KA_QUEUE_ID" ]; then
# We already have a queued work item — poll its status, don't re-POST.
# Re-POSTing while the original queued work is still pending collides with
# the native-session busy/drain window and reproduces the 502/empty failure.
set +e
KA_CODE=$(tenant_call GET "/workspaces/$PARENT_ID/a2a/queue/$KA_QUEUE_ID" \
--max-time 90 \
-o "$KA_TMP" \
-w '%{http_code}' \
2>/dev/null)
KA_RC=$?
set -e
KA_CODE=${KA_CODE:-000}
KA_RESP=$(cat "$KA_TMP" 2>/dev/null || echo "")
if [ "$KA_RC" = "0" ] && [ "$KA_CODE" -ge 200 ] && [ "$KA_CODE" -lt 300 ]; then
KA_Q_STATUS=$(echo "$KA_RESP" | python3 -c "
import json,sys
try:
print(json.load(sys.stdin).get('status',''))
except Exception:
print('')
" 2>/dev/null || echo "")
if [ "$KA_Q_STATUS" = "completed" ]; then
# The queue row stores the agent's A2A response in response_body.
KA_RESP=$(echo "$KA_RESP" | python3 -c "
import json,sys
try:
rb=json.load(sys.stdin).get('response_body')
print(json.dumps(rb) if rb is not None else '')
except Exception:
print('')
" 2>/dev/null || echo "")
if [ -n "$KA_RESP" ]; then
KA_CODE=200
break
fi
fi
# Still pending — poll again.
if echo "$KA_Q_STATUS" | grep -Eqi 'queued|dispatched|in_progress'; then
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
log " known-answer queue poll attempt $KA_ATTEMPT/12 status=$KA_Q_STATUS: $KA_SAFE_BODY"
if [ "$KA_ATTEMPT" -lt 12 ]; then sleep 10; continue; fi
fi
fi
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then
log " known-answer queue poll transient $KA_CODE attempt $KA_ATTEMPT/12: $KA_SAFE_BODY"
if [ "$KA_ATTEMPT" -lt 12 ]; then
KA_SLEEP=10
if echo "$KA_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then
KA_SLEEP=30
fi
sleep "$KA_SLEEP"
continue
fi
fi
break
else
# Initial POST (or retry before we have a queue_id). A 202 queued response
# transitions to the poll path above; a terminal 2xx breaks immediately.
set +e
KA_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \
--max-time 90 \
-H "Content-Type: application/json" \
-d "$KA_PAYLOAD" \
-o "$KA_TMP" \
-w '%{http_code}' \
2>/dev/null)
KA_RC=$?
set -e
KA_CODE=${KA_CODE:-000}
KA_RESP=$(cat "$KA_TMP" 2>/dev/null || echo "")
if [ "$KA_RC" = "0" ] && [ "$KA_CODE" -ge 200 ] && [ "$KA_CODE" -lt 300 ]; then
KA_IS_QUEUED=$(echo "$KA_RESP" | python3 -c "
import json,sys
try:
d=json.load(sys.stdin)
print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false')
except Exception:
print('false')
" 2>/dev/null || echo "false")
if [ "$KA_IS_QUEUED" = "true" ]; then
KA_QUEUE_ID=$(echo "$KA_RESP" | python3 -c "
import json,sys
try:
print(json.load(sys.stdin).get('queue_id',''))
except Exception:
print('')
" 2>/dev/null || echo "")
if [ -n "$KA_QUEUE_ID" ]; then
log " known-answer A2A queued (queue_id=$KA_QUEUE_ID); switching to poll"
continue
fi
else
break
fi
fi
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then
log " known-answer A2A transient $KA_CODE attempt $KA_ATTEMPT/12: $KA_SAFE_BODY"
if [ "$KA_ATTEMPT" -lt 12 ]; then
KA_SLEEP=10
if echo "$KA_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then
KA_SLEEP=30
fi
sleep "$KA_SLEEP"
continue
fi
fi
break
fi
done
rm -f "$KA_TMP"
if [ "$KA_RC" != "0" ] || [ "$KA_CODE" -lt 200 ] || [ "$KA_CODE" -ge 300 ]; then
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
fail "Known-answer A2A failed after $KA_ATTEMPT attempt(s) (curl_rc=$KA_RC, http=$KA_CODE): $KA_SAFE_BODY"
fi
# core#2437 part B: send A2A and poll queue status if the agent queues it.
KA_RESP=$(a2a_send_or_poll_queue "$PARENT_ID" "$KA_PAYLOAD" "A2A known-answer")
KA_TEXT=$(echo "$KA_RESP" | python3 -c "
import json, sys
def extract_text(d):